You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucenenet.apache.org by ni...@apache.org on 2017/08/18 08:04:51 UTC
[04/20] lucenenet git commit: LUCENENET-565: Porting of Lucene
Replicator - Commit is for Review with comments about original Java Source
for assistance.
http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/IndexAndTaxonomyReplicationHandler.cs
----------------------------------------------------------------------
diff --git a/src/Lucene.Net.Replicator/IndexAndTaxonomyReplicationHandler.cs b/src/Lucene.Net.Replicator/IndexAndTaxonomyReplicationHandler.cs
new file mode 100644
index 0000000..a9629b8
--- /dev/null
+++ b/src/Lucene.Net.Replicator/IndexAndTaxonomyReplicationHandler.cs
@@ -0,0 +1,276 @@
+//STATUS: DRAFT - 4.8.0
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Lucene.Net.Index;
+using Lucene.Net.Store;
+using Lucene.Net.Support;
+using Lucene.Net.Util;
+using Directory = Lucene.Net.Store.Directory;
+
+namespace Lucene.Net.Replicator
+{
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ /// <summary>
+ /// A {@link ReplicationHandler} for replication of an index and taxonomy pair.
+ /// See {@link IndexReplicationHandler} for more detail. This handler ensures
+ /// that the search and taxonomy indexes are replicated in a consistent way.
+ ///
+ /// <see cref="IndexReplicationHandler"/>
+ /// </summary>
+ /// <remarks>
+ /// If you intend to recreate a taxonomy index, you should make sure
+ /// to reopen an IndexSearcher and TaxonomyReader pair via the provided callback,
+ /// to guarantee that both indexes are in sync. This handler does not prevent
+ /// replicating such index and taxonomy pairs, and if they are reopened by a
+ /// different thread, unexpected errors can occur, as well as inconsistency
+ /// between the taxonomy and index readers.
+ ///
+ /// Lucene.Experimental
+ /// </remarks>
+ public class IndexAndTaxonomyReplicationHandler : IReplicationHandler
+ {
+ /// <summary>
+ /// The component used to log messages to the {@link InfoStream#getDefault()default} {@link InfoStream}.
+ /// </summary>
+ public const string INFO_STREAM_COMPONENT = "IndexAndTaxonomyReplicationHandler";
+
+ private readonly Directory indexDirectory;
+ private readonly Directory taxonomyDirectory;
+ private readonly Func<bool?> callback;
+
+ private InfoStream infoStream = InfoStream.Default;
+
+ public string CurrentVersion { get; private set; }
+ public IDictionary<string, IList<RevisionFile>> CurrentRevisionFiles { get; private set; }
+ public InfoStream InfoStream
+ {
+ get { return infoStream; }
+ set { infoStream = value ?? InfoStream.NO_OUTPUT; }
+ }
+
+ /// <summary>
+ /// Constructor with the given index directory and callback to notify when the indexes were updated.
+ /// </summary>
+ /// <param name="indexDirectory"></param>
+ /// <param name="taxonomyDirectory"></param>
+ /// <param name="callback"></param>
+ /// <exception cref="System.IO.IOException"></exception>
+ public IndexAndTaxonomyReplicationHandler(Directory indexDirectory, Directory taxonomyDirectory, Func<bool?> callback)
+ {
+ this.indexDirectory = indexDirectory;
+ this.taxonomyDirectory = taxonomyDirectory;
+ this.callback = callback;
+
+ CurrentVersion = null;
+ CurrentRevisionFiles = null;
+
+ bool indexExists = DirectoryReader.IndexExists(indexDirectory);
+ bool taxonomyExists = DirectoryReader.IndexExists(taxonomyDirectory);
+
+ //JAVA: IllegalStateException
+ if (indexExists != taxonomyExists)
+ throw new InvalidOperationException(string.Format("search and taxonomy indexes must either both exist or not: index={0} taxo={1}", indexExists, taxonomyExists));
+
+ if (indexExists)
+ {
+ IndexCommit indexCommit = IndexReplicationHandler.GetLastCommit(indexDirectory);
+ IndexCommit taxonomyCommit = IndexReplicationHandler.GetLastCommit(taxonomyDirectory);
+
+ CurrentRevisionFiles = IndexAndTaxonomyRevision.RevisionFiles(indexCommit, taxonomyCommit);
+ CurrentVersion = IndexAndTaxonomyRevision.RevisionVersion(indexCommit, taxonomyCommit);
+
+ WriteToInfoStream(
+ string.Format("constructor(): currentVersion={0} currentRevisionFiles={1}", CurrentVersion, CurrentRevisionFiles),
+ string.Format("constructor(): indexCommit={0} taxoCommit={1}", indexCommit, taxonomyCommit));
+ }
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="version"></param>
+ /// <param name="revisionFiles"></param>
+ /// <param name="copiedFiles"></param>
+ /// <param name="sourceDirectory"></param>
+ /// <exception cref=""></exception>
+ public void RevisionReady(string version,
+ IDictionary<string, IList<RevisionFile>> revisionFiles,
+ IDictionary<string, IList<string>> copiedFiles,
+ IDictionary<string, Directory> sourceDirectory)
+ {
+ #region Java
+ //JAVA: Directory taxoClientDir = sourceDirectory.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE);
+ //JAVA: Directory indexClientDir = sourceDirectory.get(IndexAndTaxonomyRevision.INDEX_SOURCE);
+ //JAVA: List<String> taxoFiles = copiedFiles.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE);
+ //JAVA: List<String> indexFiles = copiedFiles.get(IndexAndTaxonomyRevision.INDEX_SOURCE);
+ //JAVA: String taxoSegmentsFile = IndexReplicationHandler.getSegmentsFile(taxoFiles, true);
+ //JAVA: String indexSegmentsFile = IndexReplicationHandler.getSegmentsFile(indexFiles, false);
+ //JAVA:
+ //JAVA: boolean success = false;
+ //JAVA: try {
+ //JAVA: // copy taxonomy files before index files
+ //JAVA: IndexReplicationHandler.copyFiles(taxoClientDir, taxoDir, taxoFiles);
+ //JAVA: IndexReplicationHandler.copyFiles(indexClientDir, indexDir, indexFiles);
+ //JAVA:
+ //JAVA: // fsync all copied files (except segmentsFile)
+ //JAVA: if (!taxoFiles.isEmpty()) {
+ //JAVA: taxoDir.sync(taxoFiles);
+ //JAVA: }
+ //JAVA: indexDir.sync(indexFiles);
+ //JAVA:
+ //JAVA: // now copy and fsync segmentsFile, taxonomy first because it is ok if a
+ //JAVA: // reader sees a more advanced taxonomy than the index.
+ //JAVA: if (taxoSegmentsFile != null) {
+ //JAVA: taxoClientDir.copy(taxoDir, taxoSegmentsFile, taxoSegmentsFile, IOContext.READONCE);
+ //JAVA: }
+ //JAVA: indexClientDir.copy(indexDir, indexSegmentsFile, indexSegmentsFile, IOContext.READONCE);
+ //JAVA:
+ //JAVA: if (taxoSegmentsFile != null) {
+ //JAVA: taxoDir.sync(Collections.singletonList(taxoSegmentsFile));
+ //JAVA: }
+ //JAVA: indexDir.sync(Collections.singletonList(indexSegmentsFile));
+ //JAVA:
+ //JAVA: success = true;
+ //JAVA: } finally {
+ //JAVA: if (!success) {
+ //JAVA: taxoFiles.add(taxoSegmentsFile); // add it back so it gets deleted too
+ //JAVA: IndexReplicationHandler.cleanupFilesOnFailure(taxoDir, taxoFiles);
+ //JAVA: indexFiles.add(indexSegmentsFile); // add it back so it gets deleted too
+ //JAVA: IndexReplicationHandler.cleanupFilesOnFailure(indexDir, indexFiles);
+ //JAVA: }
+ //JAVA: }
+ //JAVA:
+ //JAVA: // all files have been successfully copied + sync'd. update the handler's state
+ //JAVA: currentRevisionFiles = revisionFiles;
+ //JAVA: currentVersion = version;
+ //JAVA:
+ //JAVA: if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ //JAVA: infoStream.message(INFO_STREAM_COMPONENT, "revisionReady(): currentVersion=" + currentVersion
+ //JAVA: + " currentRevisionFiles=" + currentRevisionFiles);
+ //JAVA: }
+ //JAVA:
+ //JAVA: // update the segments.gen file
+ //JAVA: IndexReplicationHandler.writeSegmentsGen(taxoSegmentsFile, taxoDir);
+ //JAVA: IndexReplicationHandler.writeSegmentsGen(indexSegmentsFile, indexDir);
+ //JAVA:
+ //JAVA: // Cleanup the index directory from old and unused index files.
+ //JAVA: // NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have
+ //JAVA: // side-effects, e.g. if it hits sudden IO errors while opening the index
+ //JAVA: // (and can end up deleting the entire index). It is not our job to protect
+ //JAVA: // against those errors, app will probably hit them elsewhere.
+ //JAVA: IndexReplicationHandler.cleanupOldIndexFiles(indexDir, indexSegmentsFile);
+ //JAVA: IndexReplicationHandler.cleanupOldIndexFiles(taxoDir, taxoSegmentsFile);
+ //JAVA:
+ //JAVA: // successfully updated the index, notify the callback that the index is
+ //JAVA: // ready.
+ //JAVA: if (callback != null) {
+ //JAVA: try {
+ //JAVA: callback.call();
+ //JAVA: } catch (Exception e) {
+ //JAVA: throw new IOException(e);
+ //JAVA: }
+ //JAVA: }
+ #endregion
+
+ Directory taxonomyClientDirectory = sourceDirectory[IndexAndTaxonomyRevision.TAXONOMY_SOURCE];
+ Directory indexClientDirectory = sourceDirectory[IndexAndTaxonomyRevision.INDEX_SOURCE];
+ IList<string> taxonomyFiles = copiedFiles[IndexAndTaxonomyRevision.TAXONOMY_SOURCE];
+ IList<string> indexFiles = copiedFiles[IndexAndTaxonomyRevision.INDEX_SOURCE];
+ string taxonomySegmentsFile = IndexReplicationHandler.GetSegmentsFile(taxonomyFiles, true);
+ string indexSegmentsFile = IndexReplicationHandler.GetSegmentsFile(indexFiles, false);
+
+ bool success = false;
+ try
+ {
+ // copy taxonomy files before index files
+ IndexReplicationHandler.CopyFiles(taxonomyClientDirectory, taxonomyDirectory, taxonomyFiles);
+ IndexReplicationHandler.CopyFiles(indexClientDirectory, indexDirectory, indexFiles);
+
+ // fsync all copied files (except segmentsFile)
+ if (taxonomyFiles.Any())
+ taxonomyDirectory.Sync(taxonomyFiles);
+ indexDirectory.Sync(indexFiles);
+
+ // now copy and fsync segmentsFile, taxonomy first because it is ok if a
+ // reader sees a more advanced taxonomy than the index.
+ if (taxonomySegmentsFile != null)
+ taxonomyClientDirectory.Copy(taxonomyDirectory, taxonomySegmentsFile, taxonomySegmentsFile, IOContext.READ_ONCE);
+ indexClientDirectory.Copy(indexDirectory, indexSegmentsFile, indexSegmentsFile, IOContext.READ_ONCE);
+
+ if (taxonomySegmentsFile != null)
+ taxonomyDirectory.Sync(new[] { taxonomySegmentsFile });
+ indexDirectory.Sync(new[] { indexSegmentsFile });
+
+ success = true;
+ }
+ finally
+ {
+ if (!success)
+ {
+ taxonomyFiles.Add(taxonomySegmentsFile); // add it back so it gets deleted too
+ IndexReplicationHandler.CleanupFilesOnFailure(taxonomyDirectory, taxonomyFiles);
+ indexFiles.Add(indexSegmentsFile); // add it back so it gets deleted too
+ IndexReplicationHandler.CleanupFilesOnFailure(indexDirectory, indexFiles);
+ }
+ }
+
+ // all files have been successfully copied + sync'd. update the handler's state
+ CurrentRevisionFiles = revisionFiles;
+ CurrentVersion = version;
+
+ WriteToInfoStream("revisionReady(): currentVersion=" + CurrentVersion + " currentRevisionFiles=" + CurrentRevisionFiles);
+
+ // update the segments.gen file
+ IndexReplicationHandler.WriteSegmentsGen(taxonomySegmentsFile, taxonomyDirectory);
+ IndexReplicationHandler.WriteSegmentsGen(indexSegmentsFile, indexDirectory);
+
+ // Cleanup the index directory from old and unused index files.
+ // NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have
+ // side-effects, e.g. if it hits sudden IO errors while opening the index
+ // (and can end up deleting the entire index). It is not our job to protect
+ // against those errors, app will probably hit them elsewhere.
+ IndexReplicationHandler.CleanupOldIndexFiles(indexDirectory, indexSegmentsFile);
+ IndexReplicationHandler.CleanupOldIndexFiles(taxonomyDirectory, taxonomySegmentsFile);
+
+ // successfully updated the index, notify the callback that the index is
+ // ready.
+ if (callback != null) {
+ try {
+ callback.Invoke();
+ } catch (Exception e) {
+ throw new IOException(e.Message, e);
+ }
+ }
+ }
+
+ private void WriteToInfoStream(params string[] messages)
+ {
+ if (!InfoStream.IsEnabled(INFO_STREAM_COMPONENT))
+ return;
+
+ foreach (string message in messages)
+ InfoStream.Message(INFO_STREAM_COMPONENT, message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/IndexAndTaxonomyRevision.cs
----------------------------------------------------------------------
diff --git a/src/Lucene.Net.Replicator/IndexAndTaxonomyRevision.cs b/src/Lucene.Net.Replicator/IndexAndTaxonomyRevision.cs
new file mode 100644
index 0000000..8d32fac
--- /dev/null
+++ b/src/Lucene.Net.Replicator/IndexAndTaxonomyRevision.cs
@@ -0,0 +1,334 @@
+//STATUS: DRAFT - 4.8.0
+
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Diagnostics;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using Lucene.Net.Facet.Taxonomy.Directory;
+using Lucene.Net.Facet.Taxonomy.WriterCache;
+using Lucene.Net.Index;
+using Lucene.Net.Store;
+using Directory = Lucene.Net.Store.Directory;
+
+namespace Lucene.Net.Replicator
+{
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ /// <summary>
+ /// A <see cref="IRevision"/> of a single index and taxonomy index files which comprises
+ /// the list of files from both indexes. This revision should be used whenever a
+ /// pair of search and taxonomy indexes need to be replicated together to
+ /// guarantee consistency of both on the replicating (client) side.
+ /// </summary>
+ /// <remarks>
+ /// Lucene.Experimental
+ /// </remarks>
+ public class IndexAndTaxonomyRevision : IRevision
+ {
+ #region Java
+ //JAVA: private final IndexWriter indexWriter;
+ //JAVA: private final SnapshotDirectoryTaxonomyWriter taxoWriter;
+ //JAVA: private final IndexCommit indexCommit, taxoCommit;
+ //JAVA: private final SnapshotDeletionPolicy indexSDP, taxoSDP;
+ //JAVA: private final String version;
+ //JAVA: private final Map<String, List<RevisionFile>> sourceFiles;
+ #endregion
+
+ public const string INDEX_SOURCE = "index";
+ public const string TAXONOMY_SOURCE = "taxonomy";
+
+ private readonly IndexWriter indexWriter;
+ private readonly SnapshotDirectoryTaxonomyWriter taxonomyWriter;
+ private readonly IndexCommit indexCommit, taxonomyCommit;
+ private readonly SnapshotDeletionPolicy indexSdp, taxonomySdp;
+
+ public string Version { get; private set; }
+ public IDictionary<string, IList<RevisionFile>> SourceFiles { get; private set; }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="indexWriter"></param>
+ /// <param name="taxonomyWriter"></param>
+ /// <exception cref="IOException"></exception>
+ public IndexAndTaxonomyRevision(IndexWriter indexWriter, SnapshotDirectoryTaxonomyWriter taxonomyWriter)
+ {
+ #region Java
+ //JAVA: /**
+ //JAVA: * Constructor over the given {@link IndexWriter}. Uses the last
+ //JAVA: * {@link IndexCommit} found in the {@link Directory} managed by the given
+ //JAVA: * writer.
+ //JAVA: */
+ //JAVA: public IndexAndTaxonomyRevision(IndexWriter indexWriter, SnapshotDirectoryTaxonomyWriter taxoWriter)
+ //JAVA: throws IOException {
+ //JAVA: IndexDeletionPolicy delPolicy = indexWriter.getConfig().getIndexDeletionPolicy();
+ //JAVA: if (!(delPolicy instanceof SnapshotDeletionPolicy)) {
+ //JAVA: throw new IllegalArgumentException("IndexWriter must be created with SnapshotDeletionPolicy");
+ //JAVA: }
+ //JAVA: this.indexWriter = indexWriter;
+ //JAVA: this.taxoWriter = taxoWriter;
+ //JAVA: this.indexSDP = (SnapshotDeletionPolicy) delPolicy;
+ //JAVA: this.taxoSDP = taxoWriter.getDeletionPolicy();
+ //JAVA: this.indexCommit = indexSDP.snapshot();
+ //JAVA: this.taxoCommit = taxoSDP.snapshot();
+ //JAVA: this.version = revisionVersion(indexCommit, taxoCommit);
+ //JAVA: this.sourceFiles = revisionFiles(indexCommit, taxoCommit);
+ //JAVA: }
+ #endregion
+
+ this.indexSdp = indexWriter.Config.IndexDeletionPolicy as SnapshotDeletionPolicy;
+ if (indexSdp == null)
+ throw new ArgumentException("IndexWriter must be created with SnapshotDeletionPolicy", "indexWriter");
+
+ this.indexWriter = indexWriter;
+ this.taxonomyWriter = taxonomyWriter;
+ this.taxonomySdp = taxonomyWriter.DeletionPolicy;
+ this.indexCommit = indexSdp.Snapshot();
+ this.taxonomyCommit = taxonomySdp.Snapshot();
+ this.Version = RevisionVersion(indexCommit, taxonomyCommit);
+ this.SourceFiles = RevisionFiles(indexCommit, taxonomyCommit);
+ }
+
+ public int CompareTo(string version)
+ {
+ #region Java
+ //JAVA: public int compareTo(String version) {
+ //JAVA: final String[] parts = version.split(":");
+ //JAVA: final long indexGen = Long.parseLong(parts[0], RADIX);
+ //JAVA: final long taxoGen = Long.parseLong(parts[1], RADIX);
+ //JAVA: final long indexCommitGen = indexCommit.getGeneration();
+ //JAVA: final long taxoCommitGen = taxoCommit.getGeneration();
+ //JAVA:
+ //JAVA: // if the index generation is not the same as this commit's generation,
+ //JAVA: // compare by it. Otherwise, compare by the taxonomy generation.
+ //JAVA: if (indexCommitGen < indexGen) {
+ //JAVA: return -1;
+ //JAVA: } else if (indexCommitGen > indexGen) {
+ //JAVA: return 1;
+ //JAVA: } else {
+ //JAVA: return taxoCommitGen < taxoGen ? -1 : (taxoCommitGen > taxoGen ? 1 : 0);
+ //JAVA: }
+ //JAVA: }
+ #endregion
+
+ string[] parts = version.Split(':');
+ long indexGen = long.Parse(parts[0], NumberStyles.HexNumber);
+ long taxonomyGen = long.Parse(parts[1], NumberStyles.HexNumber);
+ long indexCommitGen = indexCommit.Generation;
+ long taxonomyCommitGen = taxonomyCommit.Generation;
+
+ //TODO: long.CompareTo(); but which goes where.
+ if (indexCommitGen < indexGen)
+ return -1;
+
+ if (indexCommitGen > indexGen)
+ return 1;
+
+ return taxonomyCommitGen < taxonomyGen ? -1 : (taxonomyCommitGen > taxonomyGen ? 1 : 0);
+ }
+
+ public int CompareTo(IRevision other)
+ {
+ #region Java
+ //JAVA: public int compareTo(Revision o) {
+ //JAVA: IndexAndTaxonomyRevision other = (IndexAndTaxonomyRevision) o;
+ //JAVA: int cmp = indexCommit.compareTo(other.indexCommit);
+ //JAVA: return cmp != 0 ? cmp : taxoCommit.compareTo(other.taxoCommit);
+ //JAVA: }
+ #endregion
+
+ //TODO: This breaks the contract and will fail if called with a different implementation
+ // This is a flaw inherited from the original source...
+ // It should at least provide a better description to the InvalidCastException
+ IndexAndTaxonomyRevision or = (IndexAndTaxonomyRevision)other;
+ int cmp = indexCommit.CompareTo(or.indexCommit);
+ return cmp != 0 ? cmp : taxonomyCommit.CompareTo(or.taxonomyCommit);
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="source"></param>
+ /// <param name="fileName"></param>
+ /// <returns></returns>
+ /// <exception cref="IOException"></exception>
+ public Stream Open(string source, string fileName)
+ {
+ #region Java
+ //JAVA: public InputStream open(String source, String fileName) throws IOException {
+ //JAVA: assert source.equals(INDEX_SOURCE) || source.equals(TAXONOMY_SOURCE) : "invalid source; expected=(" + INDEX_SOURCE
+ //JAVA: + " or " + TAXONOMY_SOURCE + ") got=" + source;
+ //JAVA: IndexCommit ic = source.equals(INDEX_SOURCE) ? indexCommit : taxoCommit;
+ //JAVA: return new IndexInputStream(ic.getDirectory().openInput(fileName, IOContext.READONCE));
+ //JAVA: }
+ #endregion
+
+ Debug.Assert(source.Equals(INDEX_SOURCE) || source.Equals(TAXONOMY_SOURCE),
+ string.Format("invalid source; expected=({0} or {1}) got={2}", INDEX_SOURCE, TAXONOMY_SOURCE, source));
+ IndexCommit commit = source.Equals(INDEX_SOURCE) ? indexCommit : taxonomyCommit;
+ return new IndexInputStream(commit.Directory.OpenInput(fileName, IOContext.READ_ONCE));
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <exception cref="IOException"></exception>
+ public void Release()
+ {
+ #region Java
+ //JAVA: public void release() throws IOException {
+ //JAVA: try {
+ //JAVA: indexSDP.release(indexCommit);
+ //JAVA: } finally {
+ //JAVA: taxoSDP.release(taxoCommit);
+ //JAVA: }
+ //JAVA:
+ //JAVA: try {
+ //JAVA: indexWriter.deleteUnusedFiles();
+ //JAVA: } finally {
+ //JAVA: taxoWriter.getIndexWriter().deleteUnusedFiles();
+ //JAVA: }
+ //JAVA: }
+ #endregion
+
+ try
+ {
+ indexSdp.Release(indexCommit);
+ }
+ finally
+ {
+ taxonomySdp.Release(taxonomyCommit);
+ }
+
+ try
+ {
+ indexWriter.DeleteUnusedFiles();
+ }
+ finally
+ {
+ taxonomyWriter.IndexWriter.DeleteUnusedFiles();
+ }
+ }
+
+ //.NET NOTE: Changed doc comment as the JAVA one seems to be a bit too much copy/paste
+ /// <summary>
+ /// Returns a map of the revision files from the given <see cref="IndexCommit"/>s of the search and taxonomy indexes.
+ /// </summary>
+ /// <param name="indexCommit"></param>
+ /// <param name="taxonomyCommit"></param>
+ /// <returns></returns>
+ /// <exception cref="IOException"></exception>
+ public static IDictionary<string, IList<RevisionFile>> RevisionFiles(IndexCommit indexCommit, IndexCommit taxonomyCommit)
+ {
+ #region Java
+ //JAVA: /** Returns a singleton map of the revision files from the given {@link IndexCommit}. */
+ //JAVA: public static Map<String, List<RevisionFile>> revisionFiles(IndexCommit indexCommit, IndexCommit taxoCommit)
+ //JAVA: throws IOException {
+ //JAVA: HashMap<String,List<RevisionFile>> files = new HashMap<>();
+ //JAVA: files.put(INDEX_SOURCE, IndexRevision.revisionFiles(indexCommit).values().iterator().next());
+ //JAVA: files.put(TAXONOMY_SOURCE, IndexRevision.revisionFiles(taxoCommit).values().iterator().next());
+ //JAVA: return files;
+ //JAVA: }
+ #endregion
+
+ return new Dictionary<string, IList<RevisionFile>>{
+ { INDEX_SOURCE, IndexRevision.RevisionFiles(indexCommit).Values.First() },
+ { TAXONOMY_SOURCE, IndexRevision.RevisionFiles(taxonomyCommit).Values.First() }
+ };
+ }
+
+ /// <summary>
+ /// Returns a String representation of a revision's version from the given
+ /// <see cref="IndexCommit"/>s of the search and taxonomy indexes.
+ /// </summary>
+ /// <param name="commit"></param>
+ /// <returns>a String representation of a revision's version from the given <see cref="IndexCommit"/>s of the search and taxonomy indexes.</returns>
+ public static string RevisionVersion(IndexCommit indexCommit, IndexCommit taxonomyCommit)
+ {
+ #region Java
+ //JAVA: public static String revisionVersion(IndexCommit indexCommit, IndexCommit taxoCommit) {
+ //JAVA: return Long.toString(indexCommit.getGeneration(), RADIX) + ":" + Long.toString(taxoCommit.getGeneration(), RADIX);
+ //JAVA: }
+ #endregion
+
+ return string.Format("{0:X}:{1:X}", indexCommit.Generation, taxonomyCommit.Generation);
+ }
+
+ /// <summary>
+ /// A <seealso cref="DirectoryTaxonomyWriter"/> which sets the underlying
+ /// <seealso cref="IndexWriter"/>'s <seealso cref="IndexDeletionPolicy"/> to
+ /// <seealso cref="SnapshotDeletionPolicy"/>.
+ /// </summary>
+ public class SnapshotDirectoryTaxonomyWriter : DirectoryTaxonomyWriter
+ {
+ /// <summary>
+ /// Gets the <see cref="SnapshotDeletionPolicy"/> used by the underlying <see cref="IndexWriter"/>.
+ /// </summary>
+ public SnapshotDeletionPolicy DeletionPolicy { get; private set; }
+ /// <summary>
+ /// Gets the <see cref="IndexWriter"/> used by this <see cref="DirectoryTaxonomyWriter"/>.
+ /// </summary>
+ public IndexWriter IndexWriter { get; private set; }
+
+ /// <summary>
+ /// <see cref="DirectoryTaxonomyWriter(Directory, OpenMode, ITaxonomyWriterCache)"/>
+ /// </summary>
+ /// <exception cref="IOException"></exception>
+ public SnapshotDirectoryTaxonomyWriter(Directory directory, OpenMode openMode, ITaxonomyWriterCache cache)
+ : base(directory, openMode, cache)
+ {
+ }
+
+ /// <summary>
+ /// <see cref="DirectoryTaxonomyWriter(Directory, OpenMode)"/>
+ /// </summary>
+ /// <exception cref="IOException"></exception>
+ public SnapshotDirectoryTaxonomyWriter(Directory directory, OpenMode openMode = OpenMode.CREATE_OR_APPEND)
+ : base(directory, openMode)
+ {
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="openMode"></param>
+ /// <returns></returns>
+ protected override IndexWriterConfig CreateIndexWriterConfig(OpenMode openMode)
+ {
+ IndexWriterConfig conf = base.CreateIndexWriterConfig(openMode);
+ conf.IndexDeletionPolicy = DeletionPolicy = new SnapshotDeletionPolicy(conf.IndexDeletionPolicy);
+ return conf;
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="directory"></param>
+ /// <param name="config"></param>
+ /// <returns></returns>
+ protected override IndexWriter OpenIndexWriter(Directory directory, IndexWriterConfig config)
+ {
+ return IndexWriter = base.OpenIndexWriter(directory, config);
+ }
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/IndexInputInputStream.cs
----------------------------------------------------------------------
diff --git a/src/Lucene.Net.Replicator/IndexInputInputStream.cs b/src/Lucene.Net.Replicator/IndexInputInputStream.cs
new file mode 100644
index 0000000..95f6e1c
--- /dev/null
+++ b/src/Lucene.Net.Replicator/IndexInputInputStream.cs
@@ -0,0 +1,102 @@
+//STATUS: INPROGRESS - 4.8.0
+
+using System;
+using System.IO;
+using Lucene.Net.Store;
+
+namespace Lucene.Net.Replicator
+{
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <remarks>
+ /// Lucene.Experimental
+ /// </remarks>
+ public class IndexInputStream : Stream
+ {
+ private readonly IndexInput input;
+ private long remaining;
+
+ public IndexInputStream(IndexInput input)
+ {
+ this.input = input;
+ remaining = input.Length;
+ }
+
+ public override void Flush()
+ {
+ throw new InvalidOperationException("Cannot flush a readonly stream.");
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ switch (origin)
+ {
+ case SeekOrigin.Begin:
+ Position = offset;
+ break;
+ case SeekOrigin.Current:
+ Position += offset;
+ break;
+ case SeekOrigin.End:
+ Position = Length - offset;
+ break;
+ }
+ return Position;
+ }
+
+ public override void SetLength(long value)
+ {
+ throw new InvalidOperationException("Cannot change length of a readonly stream.");
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ int remaining = (int) (input.Length - input.GetFilePointer());
+ input.ReadBytes(buffer, offset, Math.Min(remaining, count));
+ return remaining;
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ throw new InvalidCastException("Cannot write to a readonly stream.");
+ }
+
+ public override bool CanRead { get { return true; } }
+ public override bool CanSeek { get { return true; } }
+ public override bool CanWrite { get { return false; } }
+ public override long Length { get { return input.Length; } }
+
+ public override long Position
+ {
+ get { return input.GetFilePointer(); }
+ set { input.Seek(value); }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ input.Dispose();
+ }
+ base.Dispose(disposing);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/IndexReplicationHandler.cs
----------------------------------------------------------------------
diff --git a/src/Lucene.Net.Replicator/IndexReplicationHandler.cs b/src/Lucene.Net.Replicator/IndexReplicationHandler.cs
new file mode 100644
index 0000000..474361a
--- /dev/null
+++ b/src/Lucene.Net.Replicator/IndexReplicationHandler.cs
@@ -0,0 +1,510 @@
+//STATUS: DRAFT - 4.8.0
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text.RegularExpressions;
+using Lucene.Net.Index;
+using Lucene.Net.Store;
+using Lucene.Net.Support;
+using Lucene.Net.Util;
+using Directory = Lucene.Net.Store.Directory;
+
+namespace Lucene.Net.Replicator
+{
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ /// <summary>
+ /// A <see cref="IReplicationHandler"/> for replication of an index. Implements
+ /// <see cref="RevisionReady"/> by copying the files pointed by the client resolver to
+ /// the index <see cref="Store.Directory"/> and then touches the index with
+ /// <see cref="IndexWriter"/> to make sure any unused files are deleted.
+ /// </summary>
+ /// <remarks>
+ /// <para>
+ /// This handler assumes that <see cref="IndexWriter"/> is not opened by
+ /// another process on the index directory. In fact, opening an
+ /// <see cref="IndexWriter"/> on the same directory to which files are copied can lead
+ /// to undefined behavior, where some or all the files will be deleted, override
+ /// other files or simply create a mess. When you replicate an index, it is best
+ /// if the index is never modified by <see cref="IndexWriter"/>, except the one that is
+ /// open on the source index, from which you replicate.
+ /// </para>
+ /// <para>
+ /// This handler notifies the application via a provided <see cref="Callable"/> when an
+ /// updated index commit was made available for it.
+ /// </para>
+ ///
+ /// Lucene.Experimental
+ /// </remarks>
+ public class IndexReplicationHandler : IReplicationHandler
+ {
+ /// <summary>
+ /// The component used to log messages to the {@link InfoStream#getDefault()
+ /// default} <seealso cref="InfoStream"/>.
+ /// </summary>
+ public const string INFO_STREAM_COMPONENT = "IndexReplicationHandler";
+
+ private readonly Directory indexDirectory;
+ private readonly Func<bool?> callback;
+ private InfoStream infoStream;
+
+ public string CurrentVersion { get; private set; }
+ public IDictionary<string, IList<RevisionFile>> CurrentRevisionFiles { get; private set; }
+
+ public InfoStream InfoStream
+ {
+ get { return infoStream; }
+ set { infoStream = value ?? InfoStream.NO_OUTPUT; }
+ }
+
+ /// <summary>
+ /// Constructor with the given index directory and callback to notify when the
+ /// indexes were updated.
+ /// </summary>
+ /// <param name="indexDirectory"></param>
+ /// <param name="callback"></param>
+ // .NET NOTE: Java uses a Callable<Boolean>, however it is never uses the returned value?
+ public IndexReplicationHandler(Directory indexDirectory, Func<bool?> callback)
+ {
+ #region JAVA
+ //JAVA: this.callback = callback;
+ //JAVA: this.indexDir = indexDir;
+ //JAVA: currentRevisionFiles = null;
+ //JAVA: currentVersion = null;
+ //JAVA: if (DirectoryReader.indexExists(indexDir))
+ //JAVA: {
+ //JAVA: final List<IndexCommit> commits = DirectoryReader.listCommits(indexDir);
+ //JAVA: final IndexCommit commit = commits.get(commits.size() - 1);
+ //JAVA: currentRevisionFiles = IndexRevision.revisionFiles(commit);
+ //JAVA: currentVersion = IndexRevision.revisionVersion(commit);
+ //JAVA: final InfoStream infoStream = InfoStream.getDefault();
+ //JAVA: if (infoStream.isEnabled(INFO_STREAM_COMPONENT))
+ //JAVA: {
+ //JAVA: infoStream.message(INFO_STREAM_COMPONENT, "constructor(): currentVersion=" + currentVersion
+ //JAVA: + " currentRevisionFiles=" + currentRevisionFiles);
+ //JAVA: infoStream.message(INFO_STREAM_COMPONENT, "constructor(): commit=" + commit);
+ //JAVA: }
+ //JAVA: }
+ #endregion
+
+ this.InfoStream = InfoStream.Default;
+ this.callback = callback;
+ this.indexDirectory = indexDirectory;
+
+ CurrentVersion = null;
+ CurrentRevisionFiles = null;
+
+ if (DirectoryReader.IndexExists(indexDirectory))
+ {
+ IList<IndexCommit> commits = DirectoryReader.ListCommits(indexDirectory);
+ IndexCommit commit = commits.Last();
+
+ CurrentVersion = IndexRevision.RevisionVersion(commit);
+ CurrentRevisionFiles = IndexRevision.RevisionFiles(commit);
+
+ WriteToInfoStream(
+ string.Format("constructor(): currentVersion={0} currentRevisionFiles={1}", CurrentVersion, CurrentRevisionFiles),
+ string.Format("constructor(): commit={0}", commit));
+ }
+ }
+
+ public void RevisionReady(string version,
+ IDictionary<string, IList<RevisionFile>> revisionFiles,
+ IDictionary<string, IList<string>> copiedFiles,
+ IDictionary<string, Directory> sourceDirectory)
+ {
+ #region Java
+ //JAVA: if (revisionFiles.size() > 1) {
+ //JAVA: throw new IllegalArgumentException("this handler handles only a single source; got " + revisionFiles.keySet());
+ //JAVA: }
+ //JAVA:
+ //JAVA: Directory clientDir = sourceDirectory.values().iterator().next();
+ //JAVA: List<String> files = copiedFiles.values().iterator().next();
+ //JAVA: String segmentsFile = getSegmentsFile(files, false);
+ //JAVA:
+ //JAVA: boolean success = false;
+ //JAVA: try {
+ //JAVA: // copy files from the client to index directory
+ //JAVA: copyFiles(clientDir, indexDir, files);
+ //JAVA:
+ //JAVA: // fsync all copied files (except segmentsFile)
+ //JAVA: indexDir.sync(files);
+ //JAVA:
+ //JAVA: // now copy and fsync segmentsFile
+ //JAVA: clientDir.copy(indexDir, segmentsFile, segmentsFile, IOContext.READONCE);
+ //JAVA: indexDir.sync(Collections.singletonList(segmentsFile));
+ //JAVA:
+ //JAVA: success = true;
+ //JAVA: } finally {
+ //JAVA: if (!success) {
+ //JAVA: files.add(segmentsFile); // add it back so it gets deleted too
+ //JAVA: cleanupFilesOnFailure(indexDir, files);
+ //JAVA: }
+ //JAVA: }
+ //JAVA:
+ //JAVA: // all files have been successfully copied + sync'd. update the handler's state
+ //JAVA: currentRevisionFiles = revisionFiles;
+ //JAVA: currentVersion = version;
+ //JAVA:
+ //JAVA: if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ //JAVA: infoStream.message(INFO_STREAM_COMPONENT, "revisionReady(): currentVersion=" + currentVersion
+ //JAVA: + " currentRevisionFiles=" + currentRevisionFiles);
+ //JAVA: }
+ //JAVA:
+ //JAVA: // update the segments.gen file
+ //JAVA: writeSegmentsGen(segmentsFile, indexDir);
+ //JAVA:
+ //JAVA: // Cleanup the index directory from old and unused index files.
+ //JAVA: // NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have
+ //JAVA: // side-effects, e.g. if it hits sudden IO errors while opening the index
+ //JAVA: // (and can end up deleting the entire index). It is not our job to protect
+ //JAVA: // against those errors, app will probably hit them elsewhere.
+ //JAVA: cleanupOldIndexFiles(indexDir, segmentsFile);
+ //JAVA:
+ //JAVA: // successfully updated the index, notify the callback that the index is
+ //JAVA: // ready.
+ //JAVA: if (callback != null) {
+ //JAVA: try {
+ //JAVA: callback.call();
+ //JAVA: } catch (Exception e) {
+ //JAVA: throw new IOException(e);
+ //JAVA: }
+ //JAVA: }
+ #endregion
+ //TODO: ArgumentOutOfRangeException more suited?
+ if (revisionFiles.Count > 1) throw new ArgumentException(string.Format("this handler handles only a single source; got {0}", revisionFiles.Keys));
+
+ Directory clientDirectory = sourceDirectory.Values.First();
+ IList<string> files = copiedFiles.Values.First();
+ string segmentsFile = GetSegmentsFile(files, false);
+
+ bool success = false;
+ try
+ {
+ // copy files from the client to index directory
+ CopyFiles(clientDirectory, indexDirectory, files);
+
+ // fsync all copied files (except segmentsFile)
+ indexDirectory.Sync(files);
+
+ // now copy and fsync segmentsFile
+ clientDirectory.Copy(indexDirectory, segmentsFile, segmentsFile, IOContext.READ_ONCE);
+ indexDirectory.Sync(new[] { segmentsFile });
+
+ success = true;
+ }
+ finally
+ {
+ if (!success)
+ {
+ files.Add(segmentsFile); // add it back so it gets deleted too
+ CleanupFilesOnFailure(indexDirectory, files);
+ }
+ }
+
+ // all files have been successfully copied + sync'd. update the handler's state
+ CurrentRevisionFiles = revisionFiles;
+ CurrentVersion = version;
+
+ WriteToInfoStream(string.Format("revisionReady(): currentVersion={0} currentRevisionFiles={1}", CurrentVersion, CurrentRevisionFiles));
+
+ // update the segments.gen file
+ WriteSegmentsGen(segmentsFile, indexDirectory);
+
+ // Cleanup the index directory from old and unused index files.
+ // NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have
+ // side-effects, e.g. if it hits sudden IO errors while opening the index
+ // (and can end up deleting the entire index). It is not our job to protect
+ // against those errors, app will probably hit them elsewhere.
+ CleanupOldIndexFiles(indexDirectory, segmentsFile);
+
+ // successfully updated the index, notify the callback that the index is
+ // ready.
+ if (callback != null) {
+ try {
+ callback.Invoke();
+ } catch (Exception e) {
+ throw new IOException(e.Message, e);
+ }
+ }
+ }
+
+ // .NET NOTE: Utility Method
+ private void WriteToInfoStream(params string[] messages)
+ {
+ if (!InfoStream.IsEnabled(INFO_STREAM_COMPONENT))
+ return;
+
+ foreach (string message in messages)
+ InfoStream.Message(INFO_STREAM_COMPONENT, message);
+ }
+
+ /// <summary>
+ /// Returns the last <see cref="IndexCommit"/> found in the <see cref="Directory"/>, or
+ /// <code>null</code> if there are no commits.
+ /// </summary>
+ /// <param name="directory"></param>
+ /// <returns></returns>
+ /// <exception cref="System.IO.IOException"></exception>
+ public static IndexCommit GetLastCommit(Directory directory)
+ {
+ #region Java
+ //JAVA: try {
+ //JAVA: if (DirectoryReader.indexExists(dir)) {
+ //JAVA: List<IndexCommit> commits = DirectoryReader.listCommits(dir);
+ //JAVA: // listCommits guarantees that we get at least one commit back, or
+ //JAVA: // IndexNotFoundException which we handle below
+ //JAVA: return commits.get(commits.size() - 1);
+ //JAVA: }
+ //JAVA: } catch (IndexNotFoundException e) {
+ //JAVA: // ignore the exception and return null
+ //JAVA: }
+ //JAVA: return null;
+ #endregion
+
+ try
+ {
+ // IndexNotFoundException which we handle below
+ return DirectoryReader.IndexExists(directory)
+ ? DirectoryReader.ListCommits(directory).Last()
+ : null;
+ }
+ catch (IndexNotFoundException)
+ {
+ // ignore the exception and return null
+ }
+ return null;
+ }
+
+ /// <summary>
+ /// Verifies that the last file is segments_N and fails otherwise. It also
+ /// removes and returns the file from the list, because it needs to be handled
+ /// last, after all files. This is important in order to guarantee that if a
+ /// reader sees the new segments_N, all other segment files are already on
+ /// stable storage.
+ /// <para>
+ /// The reason why the code fails instead of putting segments_N file last is
+ /// that this indicates an error in the Revision implementation.
+ /// </para>
+ /// </summary>
+ /// <param name="files"></param>
+ /// <param name="allowEmpty"></param>
+ /// <returns></returns>
+ public static string GetSegmentsFile(IList<string> files, bool allowEmpty)
+ {
+ #region Java
+ //JAVA: if (files.isEmpty()) {
+ //JAVA: if (allowEmpty) {
+ //JAVA: return null;
+ //JAVA: } else {
+ //JAVA: throw new IllegalStateException("empty list of files not allowed");
+ //JAVA: }
+ //JAVA: }
+ //JAVA:
+ //JAVA: String segmentsFile = files.remove(files.size() - 1);
+ //JAVA: if (!segmentsFile.startsWith(IndexFileNames.SEGMENTS) || segmentsFile.equals(IndexFileNames.SEGMENTS_GEN)) {
+ //JAVA: throw new IllegalStateException("last file to copy+sync must be segments_N but got " + segmentsFile
+ //JAVA: + "; check your Revision implementation!");
+ //JAVA: }
+ //JAVA: return segmentsFile;
+ #endregion
+
+ if (!files.Any())
+ {
+ if (allowEmpty)
+ return null;
+ throw new InvalidOperationException("empty list of files not allowed");
+ }
+
+ string segmentsFile = files.Last();
+ //NOTE: Relying on side-effects outside?
+ files.RemoveAt(files.Count - 1);
+ if (!segmentsFile.StartsWith(IndexFileNames.SEGMENTS) || segmentsFile.Equals(IndexFileNames.SEGMENTS_GEN))
+ {
+ throw new InvalidOperationException(
+ string.Format("last file to copy+sync must be segments_N but got {0}; check your Revision implementation!", segmentsFile));
+ }
+ return segmentsFile;
+ }
+
+ /// <summary>
+ /// Cleanup the index directory by deleting all given files. Called when file
+ /// copy or sync failed.
+ /// </summary>
+ /// <param name="directory"></param>
+ /// <param name="files"></param>
+ public static void CleanupFilesOnFailure(Directory directory, IList<string> files)
+ {
+ #region Java
+ //JAVA: for (String file : files) {
+ //JAVA: try {
+ //JAVA: dir.deleteFile(file);
+ //JAVA: } catch (Throwable t) {
+ //JAVA: // suppress any exception because if we're here, it means copy
+ //JAVA: // failed, and we must cleanup after ourselves.
+ //JAVA: }
+ //JAVA: }
+ #endregion
+
+ foreach (string file in files)
+ {
+ try
+ {
+ directory.DeleteFile(file);
+ }
+ catch
+ {
+ // suppress any exception because if we're here, it means copy
+ // failed, and we must cleanup after ourselves.
+ }
+ }
+ }
+
+ public static void CleanupOldIndexFiles(Directory directory, string segmentsFile)
+ {
+ #region Java
+ //JAVA: try {
+ //JAVA: IndexCommit commit = getLastCommit(dir);
+ //JAVA: // commit == null means weird IO errors occurred, ignore them
+ //JAVA: // if there were any IO errors reading the expected commit point (i.e.
+ //JAVA: // segments files mismatch), then ignore that commit either.
+ //JAVA: if (commit != null && commit.getSegmentsFileName().equals(segmentsFile)) {
+ //JAVA: Set<String> commitFiles = new HashSet<>();
+ //JAVA: commitFiles.addAll(commit.getFileNames());
+ //JAVA: commitFiles.add(IndexFileNames.SEGMENTS_GEN);
+ //JAVA: Matcher matcher = IndexFileNames.CODEC_FILE_PATTERN.matcher("");
+ //JAVA: for (String file : dir.listAll()) {
+ //JAVA: if (!commitFiles.contains(file)
+ //JAVA: && (matcher.reset(file).matches() || file.startsWith(IndexFileNames.SEGMENTS))) {
+ //JAVA: try {
+ //JAVA: dir.deleteFile(file);
+ //JAVA: } catch (Throwable t) {
+ //JAVA: // suppress, it's just a best effort
+ //JAVA: }
+ //JAVA: }
+ //JAVA: }
+ //JAVA: }
+ //JAVA: } catch (Throwable t) {
+ //JAVA: // ignore any errors that happens during this state and only log it. this
+ //JAVA: // cleanup will have a chance to succeed the next time we get a new
+ //JAVA: // revision.
+ //JAVA: }
+ #endregion
+
+ try
+ {
+ IndexCommit commit = GetLastCommit(directory);
+ // commit == null means weird IO errors occurred, ignore them
+ // if there were any IO errors reading the expected commit point (i.e.
+ // segments files mismatch), then ignore that commit either.
+
+ if (commit != null && commit.SegmentsFileName.Equals(segmentsFile))
+ {
+ HashSet<string> commitFiles = new HashSet<string>( commit.FileNames
+ .Union(new[] {IndexFileNames.SEGMENTS_GEN}));
+
+ Regex matcher = IndexFileNames.CODEC_FILE_PATTERN;
+ foreach (string file in directory.ListAll()
+ .Where(file => !commitFiles.Contains(file) && (matcher.IsMatch(file) || file.StartsWith(IndexFileNames.SEGMENTS))))
+ {
+ try
+ {
+ directory.DeleteFile(file);
+ }
+ catch
+ {
+ // suppress, it's just a best effort
+ }
+ }
+
+ }
+ }
+ catch
+ {
+ // ignore any errors that happens during this state and only log it. this
+ // cleanup will have a chance to succeed the next time we get a new
+ // revision.
+ }
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="source"></param>
+ /// <param name="target"></param>
+ /// <param name="files"></param>
+ /// <exception cref="System.IO.IOException"></exception>
+ public static void CopyFiles(Directory source, Directory target, IList<string> files)
+ {
+ #region Java
+ //JAVA: if (!source.equals(target)) {
+ //JAVA: for (String file : files) {
+ //JAVA: source.copy(target, file, file, IOContext.READONCE);
+ //JAVA: }
+ //JAVA: }
+ #endregion
+
+ if (source.Equals(target))
+ return;
+
+ foreach (string file in files)
+ source.Copy(target, file, file, IOContext.READ_ONCE);
+ }
+
+ /// <summary>
+ /// Writes <see cref="IndexFileNames.SEGMENTS_GEN"/> file to the directory, reading
+ /// the generation from the given <code>segmentsFile</code>. If it is <code>null</code>,
+ /// this method deletes segments.gen from the directory.
+ /// </summary>
+ /// <param name="segmentsFile"></param>
+ /// <param name="directory"></param>
+ public static void WriteSegmentsGen(string segmentsFile, Directory directory)
+ {
+ #region Java
+ //JAVA: public static void writeSegmentsGen(String segmentsFile, Directory dir) {
+ //JAVA: if (segmentsFile != null) {
+ //JAVA: SegmentInfos.writeSegmentsGen(dir, SegmentInfos.generationFromSegmentsFileName(segmentsFile));
+ //JAVA: } else {
+ //JAVA: try {
+ //JAVA: dir.deleteFile(IndexFileNames.SEGMENTS_GEN);
+ //JAVA: } catch (Throwable t) {
+ //JAVA: // suppress any errors while deleting this file.
+ //JAVA: }
+ //JAVA: }
+ //JAVA: }
+ #endregion
+
+ if (segmentsFile != null)
+ {
+ SegmentInfos.WriteSegmentsGen(directory, SegmentInfos.GenerationFromSegmentsFileName(segmentsFile));
+ return;
+ }
+
+ try
+ {
+ directory.DeleteFile(IndexFileNames.SEGMENTS_GEN);
+ }
+ catch
+ {
+ // suppress any errors while deleting this file.
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/IndexRevision.cs
----------------------------------------------------------------------
diff --git a/src/Lucene.Net.Replicator/IndexRevision.cs b/src/Lucene.Net.Replicator/IndexRevision.cs
new file mode 100644
index 0000000..930b120
--- /dev/null
+++ b/src/Lucene.Net.Replicator/IndexRevision.cs
@@ -0,0 +1,200 @@
+//STATUS: DRAFT - 4.8.0
+
+using System;
+using System.IO;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Diagnostics;
+using System.Globalization;
+using System.Linq;
+using Lucene.Net.Index;
+using Lucene.Net.Store;
+using Directory = Lucene.Net.Store.Directory;
+
+namespace Lucene.Net.Replicator
+{
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ /// <summary>
+ /// A <see cref="IRevision"/> of a single index files which comprises the list of files
+ /// that are part of the current <see cref="IndexCommit"/>. To ensure the files are not
+ /// deleted by <see cref="IndexWriter"/> for as long as this revision stays alive (i.e.
+ /// until <see cref="Release"/>, the current commit point is snapshotted, using
+ /// <see cref="SnapshotDeletionPolicy"/> (this means that the given writer's
+ /// <see cref="IndexWriterConfig.IndexDeletionPolicy"/> should return
+ /// <see cref="SnapshotDeletionPolicy"/>).
+ /// <p>
+ /// When this revision is <see cref="Release"/>d, it releases the obtained
+ /// snapshot as well as calls <see cref="IndexWriter.DeleteUnusedFiles"/> so that the
+ /// snapshotted files are deleted (if they are no longer needed).
+ /// </p>
+ /// </summary>
+ /// <remarks>
+ /// Lucene.Experimental
+ /// </remarks>
+ public class IndexRevision : IRevision
+ {
+ #region Java
+ //JAVA: private static final int RADIX = 16;
+ //JAVA: private static final String SOURCE = "index";
+ //JAVA: private final IndexWriter writer;
+ //JAVA: private final IndexCommit commit;
+ //JAVA: private final SnapshotDeletionPolicy sdp;
+ //JAVA: private final String version;
+ //JAVA: private final Map<String, List<RevisionFile>> sourceFiles;
+ #endregion
+
+ private const string SOURCE = "index";
+
+ private readonly IndexWriter writer;
+ private readonly IndexCommit commit;
+ private readonly SnapshotDeletionPolicy sdp;
+
+ public string Version { get; private set; }
+ public IDictionary<string, IList<RevisionFile>> SourceFiles { get; private set; }
+
+ public IndexRevision(IndexWriter writer)
+ {
+ #region Java
+ //JAVA: public IndexRevision(IndexWriter writer) throws IOException {
+ //JAVA: IndexDeletionPolicy delPolicy = writer.getConfig().getIndexDeletionPolicy();
+ //JAVA: if (!(delPolicy instanceof SnapshotDeletionPolicy)) {
+ //JAVA: throw new IllegalArgumentException("IndexWriter must be created with SnapshotDeletionPolicy");
+ //JAVA: }
+ //JAVA: this.writer = writer;
+ //JAVA: this.sdp = (SnapshotDeletionPolicy) delPolicy;
+ //JAVA: this.commit = sdp.snapshot();
+ //JAVA: this.version = revisionVersion(commit);
+ //JAVA: this.sourceFiles = revisionFiles(commit);
+ //JAVA: }
+ #endregion
+
+ sdp = writer.Config.IndexDeletionPolicy as SnapshotDeletionPolicy;
+ if (sdp == null)
+ throw new ArgumentException("IndexWriter must be created with SnapshotDeletionPolicy", "writer");
+
+ this.writer = writer;
+ this.commit = sdp.Snapshot();
+ this.Version = RevisionVersion(commit);
+ this.SourceFiles = RevisionFiles(commit);
+ }
+
+ public int CompareTo(string version)
+ {
+ #region Java
+ //JAVA: long gen = Long.parseLong(version, RADIX);
+ //JAVA: long commitGen = commit.getGeneration();
+ //JAVA: return commitGen < gen ? -1 : (commitGen > gen ? 1 : 0);
+ #endregion
+ long gen = long.Parse(version, NumberStyles.HexNumber);
+ long commitGen = commit.Generation;
+ //TODO: long.CompareTo(); but which goes where.
+ return commitGen < gen ? -1 : (commitGen > gen ? 1 : 0);
+ }
+
+ public int CompareTo(IRevision other)
+ {
+ #region Java
+ //JAVA: IndexRevision other = (IndexRevision)o;
+ //JAVA: return commit.compareTo(other.commit);
+ #endregion
+ //TODO: This breaks the contract and will fail if called with a different implementation
+ // This is a flaw inherited from the original source...
+ // It should at least provide a better description to the InvalidCastException
+ IndexRevision or = (IndexRevision)other;
+ return commit.CompareTo(or.commit);
+ }
+
+ public Stream Open(string source, string fileName)
+ {
+ Debug.Assert(source.Equals(SOURCE), string.Format("invalid source; expected={0} got={1}", SOURCE, source));
+ return new IndexInputStream(commit.Directory.OpenInput(fileName, IOContext.READ_ONCE));
+ }
+
+ public void Release()
+ {
+ sdp.Release(commit);
+ writer.DeleteUnusedFiles();
+ }
+
+ public override string ToString()
+ {
+ return "IndexRevision version=" + Version + " files=" + SourceFiles;
+ }
+
+ // returns a RevisionFile with some metadata
+ private static RevisionFile CreateRevisionFile(string fileName, Directory directory)
+ {
+ #region Java
+ //JAVA: private static RevisionFile newRevisionFile(String file, Directory dir) throws IOException {
+ //JAVA: RevisionFile revFile = new RevisionFile(file);
+ //JAVA: revFile.size = dir.fileLength(file);
+ //JAVA: return revFile;
+ //JAVA: }
+ #endregion
+ return new RevisionFile(fileName, directory.FileLength(fileName));
+ }
+
+ /** Returns a singleton map of the revision files from the given {@link IndexCommit}. */
+ public static IDictionary<string, IList<RevisionFile>> RevisionFiles(IndexCommit commit)
+ {
+ #region Java
+ //JAVA: public static Map<String,List<RevisionFile>> revisionFiles(IndexCommit commit) throws IOException {
+ //JAVA: Collection<String> commitFiles = commit.getFileNames();
+ //JAVA: List<RevisionFile> revisionFiles = new ArrayList<>(commitFiles.size());
+ //JAVA: String segmentsFile = commit.getSegmentsFileName();
+ //JAVA: Directory dir = commit.getDirectory();
+ //JAVA:
+ //JAVA: for (String file : commitFiles) {
+ //JAVA: if (!file.equals(segmentsFile)) {
+ //JAVA: revisionFiles.add(newRevisionFile(file, dir));
+ //JAVA: }
+ //JAVA: }
+ //JAVA: revisionFiles.add(newRevisionFile(segmentsFile, dir)); // segments_N must be last
+ //JAVA: return Collections.singletonMap(SOURCE, revisionFiles);
+ //JAVA: }
+ #endregion
+
+ List<RevisionFile> revisionFiles = commit.FileNames
+ .Where(file => !string.Equals(file, commit.SegmentsFileName))
+ .Select(file => CreateRevisionFile(file, commit.Directory))
+ //Note: segments_N must be last
+ .Union(new[] {CreateRevisionFile(commit.SegmentsFileName, commit.Directory)})
+ .ToList();
+ return new Dictionary<string, IList<RevisionFile>>
+ {
+ { SOURCE, revisionFiles }
+ };
+ }
+
+ /// <summary>
+ /// Returns a String representation of a revision's version from the given <see cref="IndexCommit"/>
+ /// </summary>
+ /// <param name="commit"></param>
+ /// <returns></returns>
+ public static string RevisionVersion(IndexCommit commit)
+ {
+ #region Java
+ //JAVA: public static String revisionVersion(IndexCommit commit) {
+ //JAVA: return Long.toString(commit.getGeneration(), RADIX);
+ //JAVA: }
+ #endregion
+ return commit.Generation.ToString("X");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/LocalReplicator.cs
----------------------------------------------------------------------
diff --git a/src/Lucene.Net.Replicator/LocalReplicator.cs b/src/Lucene.Net.Replicator/LocalReplicator.cs
new file mode 100644
index 0000000..ae3a3a9
--- /dev/null
+++ b/src/Lucene.Net.Replicator/LocalReplicator.cs
@@ -0,0 +1,416 @@
+//STATUS: DRAFT - 4.8.0
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using Lucene.Net.Search;
+using Lucene.Net.Support;
+
+namespace Lucene.Net.Replicator
+{
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ /// <summary>
+ /// A <see cref="IReplicator"/> implementation for use by the side that publishes
+ /// <see cref="IRevision"/>s, as well for clients to <see cref="CheckForUpdate"/>
+ /// check for updates}. When a client needs to be updated, it is returned a
+ /// <see cref="SessionToken"/> through which it can
+ /// <see cref="ObtainFile"/> the files of that
+ /// revision. As long as a revision is being replicated, this replicator
+ /// guarantees that it will not be <seealso cref="IRevision.Release"/>.
+ /// <para>
+ /// Replication sessions expire by default after
+ /// <seealso cref="DEFAULT_SESSION_EXPIRATION_THRESHOLD"/>, and the threshold can be
+ /// configured through <seealso cref="ExpirationThreshold"/>.
+ /// </para>
+ /// </summary>
+ /// <remarks>
+ /// Lucene.Experimental
+ /// </remarks>
+ public class LocalReplicator : IReplicator
+ {
+ /// <summary>Threshold for expiring inactive sessions. Defaults to 30 minutes.</summary>
+ public const long DEFAULT_SESSION_EXPIRATION_THRESHOLD = 1000 * 60 * 30;
+
+ private long expirationThreshold = DEFAULT_SESSION_EXPIRATION_THRESHOLD;
+ private readonly object padlock = new object();
+ private volatile RefCountedRevision currentRevision;
+ private volatile bool disposed = false;
+
+ private readonly AtomicInt32 sessionToken = new AtomicInt32(0);
+ private readonly Dictionary<string, ReplicationSession> sessions = new Dictionary<string, ReplicationSession>();
+
+ /// <summary>
+ /// Returns the expiration threshold.
+ /// </summary>
+ public long ExpirationThreshold
+ {
+ get { return expirationThreshold; }
+ set
+ {
+ lock (padlock)
+ {
+ EnsureOpen();
+ expirationThreshold = value;
+ CheckExpiredSessions();
+ }
+ }
+ }
+
+ public void Publish(IRevision revision)
+ {
+ #region Java
+ //JAVA: public synchronized void publish(Revision revision) throws IOException {
+ //JAVA: ensureOpen();
+ //JAVA: if (currentRevision != null) {
+ //JAVA: int compare = revision.compareTo(currentRevision.revision);
+ //JAVA: if (compare == 0) {
+ //JAVA: // same revision published again, ignore but release it
+ //JAVA: revision.release();
+ //JAVA: return;
+ //JAVA: }
+ //JAVA:
+ //JAVA: if (compare < 0) {
+ //JAVA: revision.release();
+ //JAVA: throw new IllegalArgumentException("Cannot publish an older revision: rev=" + revision + " current="
+ //JAVA: + currentRevision);
+ //JAVA: }
+ //JAVA: }
+ //JAVA:
+ //JAVA: // swap revisions
+ //JAVA: final RefCountedRevision oldRevision = currentRevision;
+ //JAVA: currentRevision = new RefCountedRevision(revision);
+ //JAVA: if (oldRevision != null) {
+ //JAVA: oldRevision.decRef();
+ //JAVA: }
+ //JAVA:
+ //JAVA: // check for expired sessions
+ //JAVA: checkExpiredSessions();
+ //JAVA: }
+ #endregion
+
+ lock (padlock)
+ {
+ EnsureOpen();
+
+ if (currentRevision != null)
+ {
+ int compare = revision.CompareTo(currentRevision.Revision);
+ if (compare == 0)
+ {
+ // same revision published again, ignore but release it
+ revision.Release();
+ return;
+ }
+
+ if (compare < 0)
+ {
+ revision.Release();
+ throw new ArgumentException(string.Format("Cannot publish an older revision: rev={0} current={1}", revision, currentRevision), "revision");
+ }
+ }
+
+ RefCountedRevision oldRevision = currentRevision;
+ currentRevision = new RefCountedRevision(revision);
+ if(oldRevision != null)
+ oldRevision.DecRef();
+
+ CheckExpiredSessions();
+ }
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="currentVersion"></param>
+ /// <returns></returns>
+ public SessionToken CheckForUpdate(string currentVersion)
+ {
+ #region Java
+ //JAVA: public synchronized SessionToken checkForUpdate(String currentVersion) {
+ //JAVA: ensureOpen();
+ //JAVA: if (currentRevision == null) { // no published revisions yet
+ //JAVA: return null;
+ //JAVA: }
+ //JAVA:
+ //JAVA: if (currentVersion != null && currentRevision.revision.compareTo(currentVersion) <= 0) {
+ //JAVA: // currentVersion is newer or equal to latest published revision
+ //JAVA: return null;
+ //JAVA: }
+ //JAVA:
+ //JAVA: // currentVersion is either null or older than latest published revision
+ //JAVA: currentRevision.incRef();
+ //JAVA: final String sessionID = Integer.toString(sessionToken.incrementAndGet());
+ //JAVA: final SessionToken sessionToken = new SessionToken(sessionID, currentRevision.revision);
+ //JAVA: final ReplicationSession timedSessionToken = new ReplicationSession(sessionToken, currentRevision);
+ //JAVA: sessions.put(sessionID, timedSessionToken);
+ //JAVA: return sessionToken;
+ //JAVA: }
+ #endregion
+
+ lock (padlock)
+ {
+ EnsureOpen();
+ if (currentRevision == null)
+ return null; // no published revisions yet
+
+ if (currentVersion != null && currentRevision.Revision.CompareTo(currentVersion) <= 0)
+ return null; // currentVersion is newer or equal to latest published revision
+
+ // currentVersion is either null or older than latest published revision
+ currentRevision.IncRef();
+
+ string sessionID = sessionToken.IncrementAndGet().ToString();
+ SessionToken token = new SessionToken(sessionID, currentRevision.Revision);
+ sessions[sessionID] = new ReplicationSession(token, currentRevision);
+ return token;
+ }
+
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="sessionId"></param>
+ /// <exception cref="InvalidOperationException"></exception>
+ public void Release(string sessionId)
+ {
+ lock (padlock)
+ {
+ EnsureOpen();
+ ReleaseSession(sessionId);
+ }
+ }
+
+ public Stream ObtainFile(string sessionId, string source, string fileName)
+ {
+ #region Java
+ //JAVA: public synchronized InputStream obtainFile(String sessionID, String source, String fileName) throws IOException {
+ //JAVA: ensureOpen();
+ //JAVA: ReplicationSession session = sessions.get(sessionID);
+ //JAVA: if (session != null && session.isExpired(expirationThresholdMilllis)) {
+ //JAVA: releaseSession(sessionID);
+ //JAVA: session = null;
+ //JAVA: }
+ //JAVA: // session either previously expired, or we just expired it
+ //JAVA: if (session == null) {
+ //JAVA: throw new SessionExpiredException("session (" + sessionID + ") expired while obtaining file: source=" + source
+ //JAVA: + " file=" + fileName);
+ //JAVA: }
+ //JAVA: sessions.get(sessionID).markAccessed();
+ //JAVA: return session.revision.revision.open(source, fileName);
+ //JAVA: }
+ #endregion
+
+ lock (padlock)
+ {
+ EnsureOpen();
+
+ ReplicationSession session = sessions[sessionId];
+ if (session != null && session.IsExpired(ExpirationThreshold))
+ {
+ ReleaseSession(sessionId);
+ session = null;
+ }
+ // session either previously expired, or we just expired it
+ if (session == null)
+ {
+ throw new SessionExpiredException(string.Format("session ({0}) expired while obtaining file: source={1} file={2}", sessionId, source, fileName));
+ }
+ sessions[sessionId].MarkAccessed();
+ return session.Revision.Revision.Open(source, fileName);
+ }
+
+ }
+
+ public void Dispose()
+ {
+ #region Java
+ //JAVA: public synchronized void close() throws IOException {
+ //JAVA: if (!closed) {
+ //JAVA: // release all managed revisions
+ //JAVA: for (ReplicationSession session : sessions.values()) {
+ //JAVA: session.revision.decRef();
+ //JAVA: }
+ //JAVA: sessions.clear();
+ //JAVA: closed = true;
+ //JAVA: }
+ //JAVA: }
+ #endregion
+
+ if (disposed)
+ return;
+
+ lock (padlock)
+ {
+ foreach (ReplicationSession session in sessions.Values)
+ session.Revision.DecRef();
+ sessions.Clear();
+ }
+ disposed = true;
+ }
+
+ /// <exception cref="InvalidOperationException"></exception>
+ private void CheckExpiredSessions()
+ {
+ #region Java
+ //JAVA: private void checkExpiredSessions() throws IOException {
+ //JAVA: // make a "to-delete" list so we don't risk deleting from the map while iterating it
+ //JAVA: final ArrayList<ReplicationSession> toExpire = new ArrayList<>();
+ //JAVA: for (ReplicationSession token : sessions.values()) {
+ //JAVA: if (token.isExpired(expirationThresholdMilllis)) {
+ //JAVA: toExpire.add(token);
+ //JAVA: }
+ //JAVA: }
+ //JAVA: for (ReplicationSession token : toExpire) {
+ //JAVA: releaseSession(token.session.id);
+ //JAVA: }
+ //JAVA: }
+ #endregion
+
+ // .NET NOTE: .ToArray() so we don't modify a collection we are enumerating...
+ // I am wondering if it would be overall more practical to switch to a concurrent dictionary...
+ foreach (ReplicationSession token in sessions.Values.Where(token => token.IsExpired(ExpirationThreshold)).ToArray())
+ {
+ ReleaseSession(token.Session.Id);
+ }
+ }
+
+ /// <exception cref="InvalidOperationException"></exception>
+ private void ReleaseSession(string sessionId)
+ {
+ #region Java
+ //JAVA: private void releaseSession(String sessionID) throws IOException {
+ //JAVA: ReplicationSession session = sessions.remove(sessionID);
+ //JAVA: // if we're called concurrently by close() and release(), could be that one
+ //JAVA: // thread beats the other to release the session.
+ //JAVA: if (session != null) {
+ //JAVA: session.revision.decRef();
+ //JAVA: }
+ //JAVA: }
+ #endregion
+
+ ReplicationSession session;
+ // if we're called concurrently by close() and release(), could be that one
+ // thread beats the other to release the session.
+ if (sessions.TryGetValue(sessionId, out session))
+ {
+ sessions.Remove(sessionId);
+ session.Revision.DecRef();
+ }
+ }
+
+ /// <summary>
+ /// Ensure that replicator is still open, or throw <see cref="ObjectDisposedException"/> otherwise.
+ /// </summary>
+ /// <exception cref="ObjectDisposedException">This replicator has already been closed</exception>
+ protected void EnsureOpen()
+ {
+ lock (padlock)
+ {
+ if (!disposed)
+ return;
+
+ throw new ObjectDisposedException("This replicator has already been disposed");
+ }
+ }
+
+ private class RefCountedRevision
+ {
+ private readonly AtomicInt32 refCount = new AtomicInt32(1);
+
+ public IRevision Revision { get; private set; }
+
+ public RefCountedRevision(IRevision revision)
+ {
+ Revision = revision;
+ }
+
+ /// <summary/>
+ /// <exception cref="InvalidOperationException"></exception>
+ public void DecRef()
+ {
+ if (refCount.Get() <= 0)
+ {
+ //JAVA: throw new IllegalStateException("this revision is already released");
+ throw new InvalidOperationException("this revision is already released");
+ }
+
+ var rc = refCount.DecrementAndGet();
+ if (rc == 0)
+ {
+ bool success = false;
+ try
+ {
+ Revision.Release();
+ success = true;
+ }
+ finally
+ {
+ if (!success)
+ {
+ // Put reference back on failure
+ refCount.IncrementAndGet();
+ }
+ }
+ }
+ else if (rc < 0)
+ {
+ //JAVA: throw new IllegalStateException("too many decRef calls: refCount is " + rc + " after decrement");
+ throw new InvalidOperationException(string.Format("too many decRef calls: refCount is {0} after decrement", rc));
+ }
+ }
+
+ public void IncRef()
+ {
+ refCount.IncrementAndGet();
+ }
+ }
+
+ private class ReplicationSession
+ {
+ public SessionToken Session { get; private set; }
+ public RefCountedRevision Revision { get; private set; }
+
+ private long lastAccessTime;
+
+ public ReplicationSession(SessionToken session, RefCountedRevision revision)
+ {
+ Session = session;
+ Revision = revision;
+ //JAVA: lastAccessTime = System.currentTimeMillis();
+ lastAccessTime = Stopwatch.GetTimestamp();
+ }
+
+ public bool IsExpired(long expirationThreshold)
+ {
+ //JAVA: return lastAccessTime < (System.currentTimeMillis() - expirationThreshold);
+ return lastAccessTime < Stopwatch.GetTimestamp() - expirationThreshold * Stopwatch.Frequency / 1000;
+ }
+
+ public void MarkAccessed()
+ {
+ //JAVA: lastAccessTime = System.currentTimeMillis();
+ lastAccessTime = Stopwatch.GetTimestamp();
+ }
+ }
+
+ }
+}
\ No newline at end of file