You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/11 17:42:36 UTC
[02/31] lucene-solr git commit: current patch
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
new file mode 100644
index 0000000..4e29508
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
@@ -0,0 +1,238 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.InputStreamDataInput;
+import org.apache.lucene.store.OutputStreamDataOutput;
+
+/** Parent JVM hold this "wrapper" to refer to each child JVM. This is roughly equivalent e.g. to a client-side "sugar" API. */
+class NodeProcess implements Closeable {
+ final Process p;
+
+ // Port sub-process is listening on
+ final int tcpPort;
+
+ final int id;
+
+ final Thread pumper;
+
+ // Acquired when searching or indexing wants to use this node:
+ final ReentrantLock lock;
+
+ final boolean isPrimary;
+
+ // Version in the commit point we opened on init:
+ final long initCommitVersion;
+
+ // SegmentInfos.version, which can be higher than the initCommitVersion
+ final long initInfosVersion;
+
+ volatile boolean isOpen = true;
+
+ public NodeProcess(Process p, int id, int tcpPort, Thread pumper, boolean isPrimary, long initCommitVersion, long initInfosVersion) {
+ this.p = p;
+ this.id = id;
+ this.tcpPort = tcpPort;
+ this.pumper = pumper;
+ this.isPrimary = isPrimary;
+ this.initCommitVersion = initCommitVersion;
+ this.initInfosVersion = initInfosVersion;
+ assert initInfosVersion >= initCommitVersion: "initInfosVersion=" + initInfosVersion + " initCommitVersion=" + initCommitVersion;
+ lock = new ReentrantLock();
+ }
+
+ @Override
+ public String toString() {
+ if (isPrimary) {
+ return "P" + id + " tcpPort=" + tcpPort;
+ } else {
+ return "R" + id + " tcpPort=" + tcpPort;
+ }
+ }
+
+ public synchronized void crash() {
+ if (isOpen) {
+ isOpen = false;
+ p.destroy();
+ try {
+ pumper.join();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(ie);
+ }
+ }
+ }
+
+ public boolean commit() {
+ try (Connection c = new Connection(tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_COMMIT);
+ c.flush();
+ c.s.shutdownOutput();
+ if (c.in.readByte() != 1) {
+ throw new RuntimeException("commit failed");
+ }
+ return true;
+ } catch (Throwable t) {
+ // Something wrong with this replica; skip it:
+ System.out.println("PARENT: top: hit SocketException during commit with R" + id + ": " + t + "; skipping");
+ return false;
+ }
+ }
+
+ public void commitAsync() {
+ try (Connection c = new Connection(tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_COMMIT);
+ c.flush();
+ } catch (Throwable t) {
+ // Something wrong with this replica; skip it:
+ System.out.println("PARENT: top: hit SocketException during commit with R" + id + ": " + t + "; skipping");
+ }
+ }
+
+ public long getSearchingVersion() {
+ try (Connection c = new Connection(tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_GET_SEARCHING_VERSION);
+ c.flush();
+ c.s.shutdownOutput();
+ return c.in.readVLong();
+ } catch (Throwable t) {
+ // Something wrong with this replica; skip it:
+ System.out.println("PARENT: top: hit SocketException during getSearchingVersion with R" + id + "; skipping");
+ return -1L;
+ }
+ }
+
+ /** Ask the primary node process to flush. We send it all currently up replicas so it can notify them about the new NRT point. Returns the newly
+ * flushed version, or a negative (current) version if there were no changes. */
+ public synchronized long flush() throws IOException {
+ assert isPrimary;
+ try (Connection c = new Connection(tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_FLUSH);
+ c.flush();
+ c.s.shutdownOutput();
+ return c.in.readLong();
+ }
+ }
+
+ @Override
+ public void close() {
+ shutdown();
+ }
+
+ public synchronized boolean shutdown() {
+ lock.lock();
+ try {
+ System.out.println("PARENT: now shutdown node=" + id + " isOpen=" + isOpen);
+ if (isOpen) {
+ // Ask the child process to shutdown gracefully:
+ isOpen = false;
+ System.out.println("PARENT: send CMD_CLOSE to node=" + id);
+ try (Connection c = new Connection(tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_CLOSE);
+ c.flush();
+ if (c.in.readByte() != 1) {
+ throw new RuntimeException("shutdown failed");
+ }
+ } catch (Throwable t) {
+ System.out.println("top: shutdown failed; skipping");
+ t.printStackTrace(System.out);
+ return false;
+ }
+ try {
+ p.waitFor();
+ pumper.join();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(ie);
+ }
+ }
+
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void addOrUpdateDocument(Connection c, Document doc, boolean isUpdate) throws IOException {
+ if (isPrimary == false) {
+ throw new IllegalStateException("only primary can index");
+ }
+ int fieldCount = 0;
+
+ String title = doc.get("title");
+ if (title != null) {
+ fieldCount++;
+ }
+
+ String docid = doc.get("docid");
+ assert docid != null;
+ fieldCount++;
+
+ String body = doc.get("body");
+ if (body != null) {
+ fieldCount++;
+ }
+
+ String marker = doc.get("marker");
+ if (marker != null) {
+ fieldCount++;
+ }
+
+ c.out.writeByte(isUpdate ? SimplePrimaryNode.CMD_UPDATE_DOC : SimplePrimaryNode.CMD_ADD_DOC);
+ c.out.writeVInt(fieldCount);
+ c.out.writeString("docid");
+ c.out.writeString(docid);
+ if (title != null) {
+ c.out.writeString("title");
+ c.out.writeString(title);
+ }
+ if (body != null) {
+ c.out.writeString("body");
+ c.out.writeString(body);
+ }
+ if (marker != null) {
+ c.out.writeString("marker");
+ c.out.writeString(marker);
+ }
+ c.flush();
+ c.in.readByte();
+ }
+
+ public void deleteDocument(Connection c, String docid) throws IOException {
+ if (isPrimary == false) {
+ throw new IllegalStateException("only primary can index");
+ }
+ c.out.writeByte(SimplePrimaryNode.CMD_DELETE_DOC);
+ c.out.writeString(docid);
+ c.flush();
+ c.in.readByte();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleCopyJob.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleCopyJob.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleCopyJob.java
new file mode 100644
index 0000000..1180967
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleCopyJob.java
@@ -0,0 +1,294 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.InputStreamDataInput;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.store.OutputStreamDataOutput;
+import org.apache.lucene.store.OutputStreamIndexOutput;
+import org.apache.lucene.store.RateLimiter;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+
+/** Handles one set of files that need copying, either because we have a
+ * new NRT point, or we are pre-copying merged files for merge warming. */
+class SimpleCopyJob extends CopyJob {
+ final Connection c;
+
+ final byte[] copyBuffer = new byte[65536];
+ final CopyState copyState;
+
+ private Iterator<Map.Entry<String,FileMetaData>> iter;
+
+ public SimpleCopyJob(String reason, Connection c, CopyState copyState, SimpleReplicaNode dest, Map<String,FileMetaData> files, boolean highPriority, OnceDone onceDone)
+ throws IOException {
+ super(reason, files, dest, highPriority, onceDone);
+ dest.message("create SimpleCopyJob o" + ord);
+ this.c = c;
+ this.copyState = copyState;
+ }
+
+ @Override
+ public synchronized void start() throws IOException {
+ if (iter == null) {
+ iter = toCopy.iterator();
+
+ // Send all file names / offsets up front to avoid ping-ping latency:
+ try {
+
+ // This means we resumed an already in-progress copy; we do this one first:
+ if (current != null) {
+ c.out.writeByte((byte) 0);
+ c.out.writeString(current.name);
+ c.out.writeVLong(current.getBytesCopied());
+ totBytes += current.metaData.length;
+ }
+
+ for (Map.Entry<String,FileMetaData> ent : toCopy) {
+ String fileName = ent.getKey();
+ FileMetaData metaData = ent.getValue();
+ totBytes += metaData.length;
+ c.out.writeByte((byte) 0);
+ c.out.writeString(fileName);
+ c.out.writeVLong(0);
+ }
+ c.out.writeByte((byte) 1);
+ c.flush();
+ c.s.shutdownOutput();
+
+ if (current != null) {
+ // Do this only at the end, after sending all requested files, so we don't deadlock due to socket buffering waiting for primary to
+ // send us this length:
+ long len = c.in.readVLong();
+ if (len != current.metaData.length) {
+ throw new IllegalStateException("file " + current.name + ": meta data says length=" + current.metaData.length + " but c.in says " + len);
+ }
+ }
+
+ dest.message("SimpleCopyJob.init: done start files count=" + toCopy.size() + " totBytes=" + totBytes);
+
+ } catch (Throwable t) {
+ cancel("exc during start", t);
+ throw new NodeCommunicationException("exc during start", t);
+ }
+ } else {
+ throw new IllegalStateException("already started");
+ }
+ }
+
+ @Override
+ public long getTotalBytesCopied() {
+ return totBytesCopied;
+ }
+
+ @Override
+ public Set<String> getFileNamesToCopy() {
+ Set<String> fileNames = new HashSet<>();
+ for(Map.Entry<String,FileMetaData> ent : toCopy) {
+ fileNames.add(ent.getKey());
+ }
+ return fileNames;
+ }
+
+ @Override
+ public Set<String> getFileNames() {
+ return files.keySet();
+ }
+
+ /** Higher priority and then "first come first serve" order. */
+ @Override
+ public int compareTo(CopyJob _other) {
+ SimpleCopyJob other = (SimpleCopyJob) _other;
+ if (highPriority != other.highPriority) {
+ return highPriority ? -1 : 1;
+ } else if (ord < other.ord) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public void finish() throws IOException {
+ dest.message(String.format(Locale.ROOT,
+ "top: file copy done; took %.1f msec to copy %d bytes; now rename %d tmp files",
+ (System.nanoTime() - startNS)/1000000.0,
+ totBytesCopied,
+ copiedFiles.size()));
+
+ // NOTE: if any of the files we copied overwrote a file in the current commit point, we (ReplicaNode) removed the commit point up
+ // front so that the commit is not corrupt. This way if we hit exc here, or if we crash here, we won't leave a corrupt commit in
+ // the index:
+ for(Map.Entry<String,String> ent : copiedFiles.entrySet()) {
+ String tmpFileName = ent.getValue();
+ String fileName = ent.getKey();
+
+ // Tricky: if primary crashes while warming (pre-copying) a merged segment _X, the new primary can easily flush or merge to _X (since we don't
+ // have a distributed inflateGens for the new primary) and _X file names will be reused. In this case, our local deleter will be
+ // thinking it must remove _X's files (from the warmed merge that never went live), but this is dangerous when virus checker is active
+ // since deleter may finally succeed in deleting the file after we have copied the new _X flushed files. So at this point was ask the
+ // deleter to NOT delete the file anymore:
+ dest.deleter.clearPending(Collections.singleton(fileName));
+
+ if (Node.VERBOSE_FILES) {
+ dest.message("rename file " + tmpFileName + " to " + fileName);
+ }
+
+ // NOTE: if this throws exception, then some files have been moved to their true names, and others are leftover .tmp files. I don't
+ // think heroic exception handling is necessary (no harm will come, except some leftover files), nor warranted here (would make the
+ // code more complex, for the exceptional cases when something is wrong w/ your IO system):
+ dest.dir.renameFile(tmpFileName, fileName);
+ }
+
+ copiedFiles.clear();
+ }
+
+ /** Do an iota of work; returns true if all copying is done */
+ synchronized boolean visit() throws IOException {
+ if (exc != null) {
+ // We were externally cancelled:
+ return true;
+ }
+
+ if (current == null) {
+ if (iter.hasNext() == false) {
+ c.close();
+ return true;
+ }
+
+ Map.Entry<String,FileMetaData> next = iter.next();
+ FileMetaData metaData = next.getValue();
+ String fileName = next.getKey();
+ long len = c.in.readVLong();
+ if (len != metaData.length) {
+ throw new IllegalStateException("file " + fileName + ": meta data says length=" + metaData.length + " but c.in says " + len);
+ }
+ current = new CopyOneFile(c.in, dest, fileName, metaData, copyBuffer);
+ }
+
+ if (current.visit()) {
+ // This file is done copying
+ copiedFiles.put(current.name, current.tmpName);
+ totBytesCopied += current.getBytesCopied();
+ assert totBytesCopied <= totBytes: "totBytesCopied=" + totBytesCopied + " totBytes=" + totBytes;
+ current = null;
+ return false;
+ }
+
+ return false;
+ }
+
+ protected CopyOneFile newCopyOneFile(CopyOneFile prev) {
+ return new CopyOneFile(prev, c.in);
+ }
+
+ @Override
+ public synchronized void transferAndCancel(CopyJob prevJob) throws IOException {
+ try {
+ super.transferAndCancel(prevJob);
+ } finally {
+ IOUtils.closeWhileHandlingException(((SimpleCopyJob) prevJob).c);
+ }
+ }
+
+ public synchronized void cancel(String reason, Throwable exc) {
+ try {
+ super.cancel(reason, exc);
+ } finally {
+ IOUtils.closeWhileHandlingException(c);
+ }
+ }
+
+ @Override
+ public boolean getFailed() {
+ return exc != null;
+ }
+
+ @Override
+ public String toString() {
+ return "SimpleCopyJob(ord=" + ord + " " + reason + " highPriority=" + highPriority + " files count=" + files.size() + " bytesCopied=" + totBytesCopied + " (of " + totBytes + ") filesCopied=" + copiedFiles.size() + ")";
+ }
+
+ @Override
+ public void runBlocking() throws IOException {
+ while (visit() == false);
+
+ if (getFailed()) {
+ throw new RuntimeException("copy failed: " + cancelReason, exc);
+ }
+ }
+
+ @Override
+ public CopyState getCopyState() {
+ return copyState;
+ }
+
+ @Override
+ public synchronized boolean conflicts(CopyJob _other) {
+ Set<String> filesToCopy = new HashSet<>();
+ for(Map.Entry<String,FileMetaData> ent : toCopy) {
+ filesToCopy.add(ent.getKey());
+ }
+
+ SimpleCopyJob other = (SimpleCopyJob) _other;
+ synchronized (other) {
+ for(Map.Entry<String,FileMetaData> ent : other.toCopy) {
+ if (filesToCopy.contains(ent.getKey())) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
new file mode 100644
index 0000000..18e77ef
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
@@ -0,0 +1,674 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FieldType;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.IndexOptions;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.LogMergePolicy;
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.store.NIOFSDirectory;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/** A primary node that uses simple TCP connections to send commands and copy files */
+
+class SimplePrimaryNode extends PrimaryNode {
+
+ final int tcpPort;
+
+ // These are updated by parent test process whenever replicas change:
+ int[] replicaTCPPorts;
+ int[] replicaIDs;
+
+ // So we only flip a bit once per file name:
+ final Set<String> bitFlipped = Collections.synchronizedSet(new HashSet<>());
+
+ static class MergePreCopy {
+ final List<Connection> connections = Collections.synchronizedList(new ArrayList<>());
+ final Map<String,FileMetaData> files;
+ private boolean finished;
+
+ public MergePreCopy(Map<String,FileMetaData> files) {
+ this.files = files;
+ }
+
+ public synchronized boolean tryAddConnection(Connection c) {
+ if (finished == false) {
+ connections.add(c);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public synchronized boolean finished() {
+ if (connections.isEmpty()) {
+ finished = true;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ final List<MergePreCopy> warmingSegments = Collections.synchronizedList(new ArrayList<>());
+
+ public SimplePrimaryNode(Random random, Path indexPath, int id, int tcpPort, long primaryGen, long forcePrimaryVersion, SearcherFactory searcherFactory) throws IOException {
+ super(initWriter(id, random, indexPath), id, primaryGen, forcePrimaryVersion, searcherFactory);
+ this.tcpPort = tcpPort;
+ }
+
+ /** Records currently alive replicas. */
+ public synchronized void setReplicas(int[] replicaIDs, int[] replicaTCPPorts) {
+ message("top: set replicasIDs=" + Arrays.toString(replicaIDs) + " tcpPorts=" + Arrays.toString(replicaTCPPorts));
+ this.replicaIDs = replicaIDs;
+ this.replicaTCPPorts = replicaTCPPorts;
+ }
+
+ private static IndexWriter initWriter(int id, Random random, Path indexPath) throws IOException {
+ Directory dir = SimpleReplicaNode.getDirectory(random, id, indexPath);
+
+ MockAnalyzer analyzer = new MockAnalyzer(random);
+ analyzer.setMaxTokenLength(TestUtil.nextInt(random, 1, IndexWriter.MAX_TERM_LENGTH));
+ IndexWriterConfig iwc = LuceneTestCase.newIndexWriterConfig(random, analyzer);
+
+ MergePolicy mp = iwc.getMergePolicy();
+ //iwc.setInfoStream(new PrintStreamInfoStream(System.out));
+
+ // Force more frequent merging so we stress merge warming:
+ if (mp instanceof TieredMergePolicy) {
+ TieredMergePolicy tmp = (TieredMergePolicy) mp;
+ tmp.setSegmentsPerTier(3);
+ tmp.setMaxMergeAtOnce(3);
+ } else if (mp instanceof LogMergePolicy) {
+ LogMergePolicy lmp = (LogMergePolicy) mp;
+ lmp.setMergeFactor(3);
+ }
+
+ IndexWriter writer = new IndexWriter(dir, iwc);
+
+ TestUtil.reduceOpenFiles(writer);
+ return writer;
+ }
+
+ @Override
+ protected void preCopyMergedSegmentFiles(SegmentCommitInfo info, Map<String,FileMetaData> files) throws IOException {
+ int[] replicaTCPPorts = this.replicaTCPPorts;
+ if (replicaTCPPorts == null) {
+ message("no replicas; skip warming " + info);
+ return;
+ }
+
+ message("top: warm merge " + info + " to " + replicaTCPPorts.length + " replicas: files=" + files.keySet());
+
+ MergePreCopy preCopy = new MergePreCopy(files);
+ warmingSegments.add(preCopy);
+
+ try {
+
+ Set<String> fileNames = files.keySet();
+
+ // Ask all currently known replicas to pre-copy this newly merged segment's files:
+ for (int replicaTCPPort : replicaTCPPorts) {
+ try {
+ Connection c = new Connection(replicaTCPPort);
+ c.out.writeByte(SimpleReplicaNode.CMD_PRE_COPY_MERGE);
+ c.out.writeVLong(primaryGen);
+ c.out.writeVInt(tcpPort);
+ SimpleServer.writeFilesMetaData(c.out, files);
+ c.flush();
+ c.s.shutdownOutput();
+ message("warm connection " + c.s);
+ preCopy.connections.add(c);
+ } catch (Throwable t) {
+ message("top: ignore exception trying to warm to replica port " + replicaTCPPort + ": " + t);
+ //t.printStackTrace(System.out);
+ }
+ }
+
+ long startNS = System.nanoTime();
+ long lastWarnNS = startNS;
+
+ // TODO: maybe ... place some sort of time limit on how long we are willing to wait for slow replicas to finish copying?
+ while (preCopy.finished() == false) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+
+ if (isClosed()) {
+ message("top: primary is closing: now cancel segment warming");
+ synchronized(preCopy.connections) {
+ IOUtils.closeWhileHandlingException(preCopy.connections);
+ }
+ return;
+ }
+
+ long ns = System.nanoTime();
+ if (ns - lastWarnNS > 1000000000L) {
+ message(String.format(Locale.ROOT, "top: warning: still warming merge " + info + " to " + preCopy.connections.size() + " replicas for %.1f sec...", (ns - startNS)/1000000000.0));
+ lastWarnNS = ns;
+ }
+
+ // Process keep-alives:
+ synchronized(preCopy.connections) {
+ Iterator<Connection> it = preCopy.connections.iterator();
+ while (it.hasNext()) {
+ Connection c = it.next();
+ try {
+ long nowNS = System.nanoTime();
+ boolean done = false;
+ while (c.sockIn.available() > 0) {
+ byte b = c.in.readByte();
+ if (b == 0) {
+ // keep-alive
+ c.lastKeepAliveNS = nowNS;
+ message("keep-alive for socket=" + c.s + " merge files=" + files.keySet());
+ } else {
+ // merge is done pre-copying to this node
+ if (b != 1) {
+ throw new IllegalArgumentException();
+ }
+ message("connection socket=" + c.s + " is done warming its merge " + info + " files=" + files.keySet());
+ IOUtils.closeWhileHandlingException(c);
+ it.remove();
+ done = true;
+ break;
+ }
+ }
+
+ // If > 2 sec since we saw a keep-alive, assume this replica is dead:
+ if (done == false && nowNS - c.lastKeepAliveNS > 2000000000L) {
+ message("top: warning: replica socket=" + c.s + " for segment=" + info + " seems to be dead; closing files=" + files.keySet());
+ IOUtils.closeWhileHandlingException(c);
+ it.remove();
+ }
+ } catch (Throwable t) {
+ message("top: ignore exception trying to read byte during warm for segment=" + info + " to replica socket=" + c.s + ": " + t + " files=" + files.keySet());
+ IOUtils.closeWhileHandlingException(c);
+ it.remove();
+ }
+ }
+ }
+ }
+ } finally {
+ warmingSegments.remove(preCopy);
+ }
+ }
+
+ /** Flushes all indexing ops to disk and notifies all replicas that they should now copy */
+ private void handleFlush(DataInput topIn, DataOutput topOut, BufferedOutputStream bos) throws IOException {
+ Thread.currentThread().setName("flush");
+
+ int[] replicaTCPPorts;
+ int[] replicaIDs;
+ synchronized (this) {
+ replicaTCPPorts = this.replicaTCPPorts;
+ replicaIDs = this.replicaIDs;
+ }
+
+ message("now flush; " + replicaIDs.length + " replicas");
+
+ if (flushAndRefresh()) {
+ // Something did get flushed (there were indexing ops since the last flush):
+
+ // Tell caller the version before pushing to replicas, so that even if we crash after this, caller will know what version we
+ // (possibly) pushed to some replicas. Alternatively we could make this 2 separate ops?
+ long version = getCopyStateVersion();
+ message("send flushed version=" + version);
+ topOut.writeLong(version);
+ bos.flush();
+
+ // Notify current replicas:
+ for(int i=0;i<replicaIDs.length;i++) {
+ int replicaID = replicaIDs[i];
+ try (Connection c = new Connection(replicaTCPPorts[i])) {
+ c.out.writeByte(SimpleReplicaNode.CMD_NEW_NRT_POINT);
+ c.out.writeVLong(version);
+ c.out.writeInt(tcpPort);
+ c.flush();
+ // TODO: we should use multicast to broadcast files out to replicas
+ // TODO: ... replicas could copy from one another instead of just primary
+ // TODO: we could also prioritize one replica at a time?
+ } catch (Throwable t) {
+ message("top: failed to connect R" + replicaID + " for newNRTPoint; skipping: " + t.getMessage());
+ }
+ }
+ } else {
+ // No changes flushed:
+ topOut.writeLong(-getCopyStateVersion());
+ }
+ }
+
+ /** Pushes CopyState on the wire */
+ private static void writeCopyState(CopyState state, DataOutput out) throws IOException {
+ // TODO (opto): we could encode to byte[] once when we created the copyState, and then just send same byts to all replicas...
+ out.writeVInt(state.infosBytes.length);
+ out.writeBytes(state.infosBytes, 0, state.infosBytes.length);
+ out.writeVLong(state.gen);
+ out.writeVLong(state.version);
+ SimpleServer.writeFilesMetaData(out, state.files);
+
+ out.writeVInt(state.completedMergeFiles.size());
+ for(String fileName : state.completedMergeFiles) {
+ out.writeString(fileName);
+ }
+ out.writeVLong(state.primaryGen);
+ }
+
+ /** Called when another node (replica) wants to copy files from us */
+ private boolean handleFetchFiles(Random random, Socket socket, DataInput destIn, DataOutput destOut, BufferedOutputStream bos) throws IOException {
+ Thread.currentThread().setName("send");
+
+ int replicaID = destIn.readVInt();
+ message("top: start fetch for R" + replicaID + " socket=" + socket);
+ byte b = destIn.readByte();
+ CopyState copyState;
+ if (b == 0) {
+ // Caller already has CopyState
+ copyState = null;
+ } else if (b == 1) {
+ // Caller does not have CopyState; we pull the latest one:
+ copyState = getCopyState();
+ Thread.currentThread().setName("send-R" + replicaID + "-" + copyState.version);
+ } else {
+ // Protocol error:
+ throw new IllegalArgumentException("invalid CopyState byte=" + b);
+ }
+
+ try {
+ if (copyState != null) {
+ // Serialize CopyState on the wire to the client:
+ writeCopyState(copyState, destOut);
+ bos.flush();
+ }
+
+ byte[] buffer = new byte[16384];
+ int fileCount = 0;
+ long totBytesSent = 0;
+ while (true) {
+ byte done = destIn.readByte();
+ if (done == 1) {
+ break;
+ } else if (done != 0) {
+ throw new IllegalArgumentException("expected 0 or 1 byte but got " + done);
+ }
+
+ // Name of the file the replica wants us to send:
+ String fileName = destIn.readString();
+
+ // Starting offset in the file we should start sending bytes from:
+ long fpStart = destIn.readVLong();
+
+ try (IndexInput in = dir.openInput(fileName, IOContext.DEFAULT)) {
+ long len = in.length();
+ //message("fetch " + fileName + ": send len=" + len);
+ destOut.writeVLong(len);
+ in.seek(fpStart);
+ long upto = fpStart;
+ while (upto < len) {
+ int chunk = (int) Math.min(buffer.length, (len-upto));
+ in.readBytes(buffer, 0, chunk);
+ if (TestNRTReplication.DO_BIT_FLIPS_DURING_COPY) {
+ if (random.nextInt(3000) == 17 && bitFlipped.contains(fileName) == false) {
+ bitFlipped.add(fileName);
+ message("file " + fileName + " to R" + replicaID + ": now randomly flipping a bit at byte=" + upto);
+ int x = random.nextInt(chunk);
+ int bit = random.nextInt(8);
+ buffer[x] ^= 1 << bit;
+ }
+ }
+ destOut.writeBytes(buffer, 0, chunk);
+ upto += chunk;
+ totBytesSent += chunk;
+ }
+ }
+
+ fileCount++;
+ }
+
+ message("top: done fetch files for R" + replicaID + ": sent " + fileCount + " files; sent " + totBytesSent + " bytes");
+ } catch (Throwable t) {
+ message("top: exception during fetch: " + t.getMessage() + "; now close socket");
+ socket.close();
+ return false;
+ } finally {
+ if (copyState != null) {
+ message("top: fetch: now release CopyState");
+ releaseCopyState(copyState);
+ }
+ }
+
+ return true;
+ }
+
+ static final FieldType tokenizedWithTermVectors;
+
+ static {
+ tokenizedWithTermVectors = new FieldType(TextField.TYPE_STORED);
+ tokenizedWithTermVectors.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
+ tokenizedWithTermVectors.setStoreTermVectors(true);
+ tokenizedWithTermVectors.setStoreTermVectorOffsets(true);
+ tokenizedWithTermVectors.setStoreTermVectorPositions(true);
+ }
+
+ private void handleIndexing(Socket socket, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException {
+ Thread.currentThread().setName("indexing");
+ message("start handling indexing socket=" + socket);
+ while (true) {
+ byte cmd;
+ try {
+ cmd = in.readByte();
+ } catch (EOFException eofe) {
+ // done
+ return;
+ }
+ //message("INDEXING OP " + cmd);
+ if (cmd == CMD_ADD_DOC) {
+ handleAddDocument(in, out);
+ out.writeByte((byte) 1);
+ bos.flush();
+ } else if (cmd == CMD_UPDATE_DOC) {
+ handleUpdateDocument(in, out);
+ out.writeByte((byte) 1);
+ bos.flush();
+ } else if (cmd == CMD_DELETE_DOC) {
+ handleDeleteDocument(in, out);
+ out.writeByte((byte) 1);
+ bos.flush();
+ } else if (cmd == CMD_INDEXING_DONE) {
+ out.writeByte((byte) 1);
+ bos.flush();
+ break;
+ } else {
+ throw new IllegalArgumentException("cmd must be add, update or delete; got " + cmd);
+ }
+ }
+ }
+
+ private void handleAddDocument(DataInput in, DataOutput out) throws IOException {
+ int fieldCount = in.readVInt();
+ Document doc = new Document();
+ for(int i=0;i<fieldCount;i++) {
+ String name = in.readString();
+ String value = in.readString();
+ // NOTE: clearly NOT general!
+ if (name.equals("docid") || name.equals("marker")) {
+ doc.add(new StringField(name, value, Field.Store.YES));
+ } else if (name.equals("title")) {
+ doc.add(new StringField("title", value, Field.Store.YES));
+ doc.add(new Field("titleTokenized", value, tokenizedWithTermVectors));
+ } else if (name.equals("body")) {
+ doc.add(new Field("body", value, tokenizedWithTermVectors));
+ } else {
+ throw new IllegalArgumentException("unhandled field name " + name);
+ }
+ }
+
+ writer.addDocument(doc);
+ }
+
+ private void handleUpdateDocument(DataInput in, DataOutput out) throws IOException {
+ int fieldCount = in.readVInt();
+ Document doc = new Document();
+ String docid = null;
+ for(int i=0;i<fieldCount;i++) {
+ String name = in.readString();
+ String value = in.readString();
+ // NOTE: clearly NOT general!
+ if (name.equals("docid")) {
+ docid = value;
+ doc.add(new StringField("docid", value, Field.Store.YES));
+ } else if (name.equals("marker")) {
+ doc.add(new StringField("marker", value, Field.Store.YES));
+ } else if (name.equals("title")) {
+ doc.add(new StringField("title", value, Field.Store.YES));
+ doc.add(new Field("titleTokenized", value, tokenizedWithTermVectors));
+ } else if (name.equals("body")) {
+ doc.add(new Field("body", value, tokenizedWithTermVectors));
+ } else {
+ throw new IllegalArgumentException("unhandled field name " + name);
+ }
+ }
+
+ writer.updateDocument(new Term("docid", docid), doc);
+ }
+
+ private void handleDeleteDocument(DataInput in, DataOutput out) throws IOException {
+ String docid = in.readString();
+ writer.deleteDocuments(new Term("docid", docid));
+ }
+
+ // Sent to primary to cutover new SIS:
+ static final byte CMD_FLUSH = 10;
+
+ // Sent by replica to primary asking to copy a set of files over:
+ static final byte CMD_FETCH_FILES = 1;
+ static final byte CMD_GET_SEARCHING_VERSION = 12;
+ static final byte CMD_SEARCH = 2;
+ static final byte CMD_MARKER_SEARCH = 3;
+ static final byte CMD_COMMIT = 4;
+ static final byte CMD_CLOSE = 5;
+
+ // Send (to primary) the list of currently running replicas:
+ static final byte CMD_SET_REPLICAS = 16;
+
+ // Multiple indexing ops
+ static final byte CMD_INDEXING = 18;
+ static final byte CMD_ADD_DOC = 6;
+ static final byte CMD_UPDATE_DOC = 7;
+ static final byte CMD_DELETE_DOC = 8;
+ static final byte CMD_INDEXING_DONE = 19;
+
+ // Sent by replica to primary when replica first starts up, so primary can add it to any warming merges:
+ static final byte CMD_NEW_REPLICA = 20;
+
+ /** Handles incoming request to the naive TCP server wrapping this node */
+ void handleOneConnection(Random random, ServerSocket ss, AtomicBoolean stop, InputStream is, Socket socket, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException, InterruptedException {
+
+ outer:
+ while (true) {
+ byte cmd;
+ while (true) {
+ if (is.available() > 0) {
+ break;
+ }
+ if (stop.get()) {
+ return;
+ }
+ Thread.sleep(10);
+ }
+
+ try {
+ cmd = in.readByte();
+ } catch (EOFException eofe) {
+ break;
+ }
+
+ switch (cmd) {
+
+ case CMD_FLUSH:
+ handleFlush(in, out, bos);
+ break;
+
+ case CMD_FETCH_FILES:
+ // Replica (other node) is asking us (primary node) for files to copy
+ handleFetchFiles(random, socket, in, out, bos);
+ break;
+
+ case CMD_INDEXING:
+ handleIndexing(socket, in, out, bos);
+ break;
+
+ case CMD_GET_SEARCHING_VERSION:
+ out.writeVLong(getCurrentSearchingVersion());
+ break;
+
+ case CMD_SEARCH:
+ {
+ Thread.currentThread().setName("search");
+ IndexSearcher searcher = mgr.acquire();
+ try {
+ long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+ int hitCount = searcher.search(new TermQuery(new Term("body", "the")), 1).totalHits;
+ //message("version=" + version + " searcher=" + searcher);
+ out.writeVLong(version);
+ out.writeVInt(hitCount);
+ } finally {
+ mgr.release(searcher);
+ }
+ }
+ continue outer;
+
+ case CMD_MARKER_SEARCH:
+ {
+ Thread.currentThread().setName("msearch");
+ IndexSearcher searcher = mgr.acquire();
+ try {
+ long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+ int hitCount = searcher.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
+ out.writeVLong(version);
+ out.writeVInt(hitCount);
+ } finally {
+ mgr.release(searcher);
+ }
+ }
+ continue outer;
+
+ case CMD_COMMIT:
+ Thread.currentThread().setName("commit");
+ commit();
+ out.writeByte((byte) 1);
+ break;
+
+ case CMD_CLOSE:
+ Thread.currentThread().setName("close");
+ message("top close: now close server socket");
+ ss.close();
+ out.writeByte((byte) 1);
+ message("top close: done close server socket");
+ break;
+
+ case CMD_SET_REPLICAS:
+ Thread.currentThread().setName("set repls");
+ int count = in.readVInt();
+ int[] replicaIDs = new int[count];
+ int[] replicaTCPPorts = new int[count];
+ for(int i=0;i<count;i++) {
+ replicaIDs[i] = in.readVInt();
+ replicaTCPPorts[i] = in.readVInt();
+ }
+ out.writeByte((byte) 1);
+ setReplicas(replicaIDs, replicaTCPPorts);
+ break;
+
+ case CMD_NEW_REPLICA:
+ Thread.currentThread().setName("new repl");
+ int replicaTCPPort = in.readVInt();
+ message("new replica: " + warmingSegments.size() + " current warming merges");
+ // Step through all currently warming segments and try to add this replica if it isn't there already:
+ for(MergePreCopy preCopy : warmingSegments) {
+ message("warming segment " + preCopy.files.keySet());
+ boolean found = false;
+ synchronized (preCopy.connections) {
+ for(Connection c : preCopy.connections) {
+ if (c.destTCPPort == replicaTCPPort) {
+ found = true;
+ break;
+ }
+ }
+ }
+
+ if (found) {
+ message("this replica is already warming this segment; skipping");
+ // It's possible (maybe) that the replica started up, then a merge kicked off, and it warmed to this new replica, all before the
+ // replica sent us this command:
+ continue;
+ }
+
+ // OK, this new replica is not already warming this segment, so attempt (could fail) to start warming now:
+
+ Connection c = new Connection(replicaTCPPort);
+ if (preCopy.tryAddConnection(c) == false) {
+ // This can happen, if all other replicas just now finished warming this segment, and so we were just a bit too late. In this
+ // case the segment will be copied over in the next nrt point sent to this replica
+ message("failed to add connection to segment warmer (too late); closing");
+ c.close();
+ }
+ c.out.writeByte(SimpleReplicaNode.CMD_PRE_COPY_MERGE);
+ c.out.writeVLong(primaryGen);
+ c.out.writeVInt(tcpPort);
+ SimpleServer.writeFilesMetaData(c.out, preCopy.files);
+ c.flush();
+ c.s.shutdownOutput();
+ message("successfully started warming");
+ }
+ break;
+
+ default:
+ throw new IllegalArgumentException("unrecognized cmd=" + cmd + " via socket=" + socket);
+ }
+ bos.flush();
+ break;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
new file mode 100644
index 0000000..8667df1
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
@@ -0,0 +1,316 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.store.NIOFSDirectory;
+import org.apache.lucene.store.RateLimitedIndexOutput;
+import org.apache.lucene.store.RateLimiter;
+import org.apache.lucene.util.LuceneTestCase;
+
+class SimpleReplicaNode extends ReplicaNode {
+ final int tcpPort;
+ final Jobs jobs;
+
+ // Rate limits incoming bytes/sec when fetching files:
+ final RateLimiter fetchRateLimiter;
+ final AtomicLong bytesSinceLastRateLimiterCheck = new AtomicLong();
+ final Random random;
+
+ /** Changes over time, as primary node crashes and moves around */
+ int curPrimaryTCPPort;
+
+ public SimpleReplicaNode(Random random, int id, int tcpPort, Path indexPath, long curPrimaryGen, int primaryTCPPort, SearcherFactory searcherFactory) throws IOException {
+ super(id, getDirectory(random, id, indexPath), searcherFactory);
+ this.tcpPort = tcpPort;
+ this.random = new Random(random.nextLong());
+
+ // Random IO throttling on file copies: 5 - 20 MB/sec:
+ double mbPerSec = 5 * (1.0 + 3*random.nextDouble());
+ message(String.format(Locale.ROOT, "top: will rate limit file fetch to %.2f MB/sec", mbPerSec));
+ fetchRateLimiter = new RateLimiter.SimpleRateLimiter(mbPerSec);
+ this.curPrimaryTCPPort = primaryTCPPort;
+
+ start(curPrimaryGen);
+
+ // Handles fetching files from primary:
+ jobs = new Jobs(this);
+ jobs.setName("R" + id + ".copyJobs");
+ jobs.setDaemon(true);
+ jobs.start();
+ }
+
+ @Override
+ protected void launch(CopyJob job) {
+ jobs.launch(job);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Can't be sync'd when calling jobs since it can lead to deadlock:
+ jobs.close();
+ message("top: jobs closed");
+ synchronized(mergeCopyJobs) {
+ for (CopyJob job : mergeCopyJobs) {
+ message("top: cancel merge copy job " + job);
+ job.cancel("jobs closing", null);
+ }
+ }
+ super.close();
+ }
+
+ @Override
+ protected CopyJob newCopyJob(String reason, Map<String,FileMetaData> files, Map<String,FileMetaData> prevFiles,
+ boolean highPriority, CopyJob.OnceDone onceDone) throws IOException {
+ Connection c;
+ CopyState copyState;
+
+ // Exceptions in here mean something went wrong talking over the socket, which are fine (e.g. primary node crashed):
+ try {
+ c = new Connection(curPrimaryTCPPort);
+ c.out.writeByte(SimplePrimaryNode.CMD_FETCH_FILES);
+ c.out.writeVInt(id);
+ if (files == null) {
+ // No incoming CopyState: ask primary for latest one now
+ c.out.writeByte((byte) 1);
+ c.flush();
+ copyState = SimpleServer.readCopyState(c.in);
+ files = copyState.files;
+ } else {
+ c.out.writeByte((byte) 0);
+ copyState = null;
+ }
+ } catch (Throwable t) {
+ throw new NodeCommunicationException("exc while reading files to copy", t);
+ }
+
+ return new SimpleCopyJob(reason, c, copyState, this, files, highPriority, onceDone);
+ }
+
+ static Directory getDirectory(Random random, int id, Path path) throws IOException {
+ MockDirectoryWrapper dir = LuceneTestCase.newMockFSDirectory(path);
+
+ dir.setAssertNoUnrefencedFilesOnClose(true);
+ // This is very costly (takes more time to check than it did to index); we do this ourselves in the end instead of each time a replica
+ // is restarted:
+ dir.setCheckIndexOnClose(false);
+
+ // Corrupt any index files not referenced by current commit point; this is important (increases test evilness) because we may have done
+ // a hard crash of the previous JVM writing to this directory and so MDW's corrupt-unknown-files-on-close never ran:
+ Node.nodeMessage(id, "top: corrupt unknown files");
+ dir.corruptUnknownFiles();
+
+ return dir;
+ }
+
+ static final byte CMD_NEW_NRT_POINT = 0;
+
+ // Sent by primary to replica to pre-copy merge files:
+ static final byte CMD_PRE_COPY_MERGE = 17;
+
+ /** Handles incoming request to the naive TCP server wrapping this node */
+ void handleOneConnection(ServerSocket ss, AtomicBoolean stop, InputStream is, Socket socket, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException, InterruptedException {
+ outer:
+ while (true) {
+ byte cmd;
+ while (true) {
+ if (is.available() > 0) {
+ break;
+ }
+ if (stop.get()) {
+ return;
+ }
+ Thread.sleep(10);
+ }
+
+ try {
+ cmd = in.readByte();
+ } catch (EOFException eofe) {
+ break;
+ }
+
+ switch(cmd) {
+ case CMD_NEW_NRT_POINT:
+ {
+ long version = in.readVLong();
+ Thread.currentThread().setName("recv-" + version);
+ curPrimaryTCPPort = in.readInt();
+ newNRTPoint(version);
+ }
+ break;
+
+ case SimplePrimaryNode.CMD_GET_SEARCHING_VERSION:
+ // nocommit this is hacky:
+
+ // Tricky: if a sync is just finishing up, i.e. managed to finish copying all files just before we crashed primary, and is now
+ // in the process of opening a new reader, we need to wait for it, to be sure we really pick the most current replica:
+ if (isCopying()) {
+ message("top: getSearchingVersion: now wait for finish sync");
+ // TODO: use immediate concurrency instead of polling:
+ while (isCopying() && stop.get() == false) {
+ Thread.sleep(50);
+ message("top: curNRTCopy=" + curNRTCopy);
+ }
+ message("top: getSearchingVersion: done wait for finish sync");
+ }
+ if (stop.get() == false) {
+ out.writeVLong(getCurrentSearchingVersion());
+ } else {
+ message("top: getSearchingVersion: stop waiting for finish sync: stop is set");
+ }
+ break;
+
+ case SimplePrimaryNode.CMD_SEARCH:
+ {
+ Thread.currentThread().setName("search");
+ IndexSearcher searcher = mgr.acquire();
+ try {
+ long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+ int hitCount = searcher.search(new TermQuery(new Term("body", "the")), 1).totalHits;
+ //node.message("version=" + version + " searcher=" + searcher);
+ out.writeVLong(version);
+ out.writeVInt(hitCount);
+ } finally {
+ mgr.release(searcher);
+ }
+ }
+ continue outer;
+
+ case SimplePrimaryNode.CMD_MARKER_SEARCH:
+ {
+ Thread.currentThread().setName("msearch");
+ IndexSearcher searcher = mgr.acquire();
+ try {
+ long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+ int hitCount = searcher.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
+ out.writeVLong(version);
+ out.writeVInt(hitCount);
+ } finally {
+ mgr.release(searcher);
+ }
+ }
+ continue outer;
+
+ case SimplePrimaryNode.CMD_COMMIT:
+ Thread.currentThread().setName("commit");
+ commit();
+ out.writeByte((byte) 1);
+ break;
+
+ case SimplePrimaryNode.CMD_CLOSE:
+ Thread.currentThread().setName("close");
+ ss.close();
+ out.writeByte((byte) 1);
+ break outer;
+
+ case CMD_PRE_COPY_MERGE:
+ Thread.currentThread().setName("merge copy");
+
+ long newPrimaryGen = in.readVLong();
+ curPrimaryTCPPort = in.readVInt();
+ Map<String,FileMetaData> files = SimpleServer.readFilesMetaData(in);
+ message("done reading files to copy files=" + files.keySet());
+ AtomicBoolean finished = new AtomicBoolean();
+ CopyJob job = launchPreCopyMerge(finished, newPrimaryGen, files);
+ message("done launching copy job files=" + files.keySet());
+
+ // Silly keep alive mechanism, else if e.g. we (replica node) crash, the primary
+ // won't notice for a very long time:
+ boolean success = false;
+ try {
+ int count = 0;
+ while (true) {
+ if (finished.get() || stop.get()) {
+ break;
+ }
+ Thread.sleep(10);
+ count++;
+ if (count == 100) {
+ // Once per second or so, we send a keep alive
+ message("send merge pre copy keep alive... files=" + files.keySet());
+
+ // To be evil, we sometimes fail to keep-alive, e.g. simulating a long GC pausing us:
+ if (random.nextBoolean()) {
+ out.writeByte((byte) 0);
+ count = 0;
+ }
+ }
+ }
+
+ out.writeByte((byte) 1);
+ bos.flush();
+ success = true;
+ } finally {
+ message("done merge copy files=" + files.keySet() + " success=" + success);
+ }
+ break;
+
+ default:
+ throw new IllegalArgumentException("unrecognized cmd=" + cmd);
+ }
+ bos.flush();
+
+ break;
+ }
+ }
+
+ @Override
+ protected void sendNewReplica() throws IOException {
+ message("send new_replica to primary tcpPort=" + curPrimaryTCPPort);
+ try (Connection c = new Connection(curPrimaryTCPPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_NEW_REPLICA);
+ c.out.writeVInt(tcpPort);
+ c.flush();
+ c.s.shutdownOutput();
+ } catch (Throwable t) {
+ message("ignoring exc " + t + " sending new_replica to primary tcpPort=" + curPrimaryTCPPort);
+ }
+ }
+
+ @Override
+ public IndexOutput createTempOutput(String prefix, String suffix, IOContext ioContext) throws IOException {
+ return new RateLimitedIndexOutput(fetchRateLimiter, super.createTempOutput(prefix, suffix, ioContext));
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
new file mode 100644
index 0000000..f03a5c3
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
@@ -0,0 +1,390 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.file.Path;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FieldType;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexOptions;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.InputStreamDataInput;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.store.NIOFSDirectory;
+import org.apache.lucene.store.OutputStreamDataOutput;
+import org.apache.lucene.util.Constants;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
+import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+
+/** Child process with silly naive TCP socket server to handle
+ * between-node commands, launched for each node by TestNRTReplication. */
+@SuppressCodecs({"MockRandom", "Memory", "Direct", "SimpleText"})
+@SuppressSysoutChecks(bugUrl = "Stuff gets printed, important stuff for debugging a failure")
+public class SimpleServer extends LuceneTestCase {
+
+ final static Set<Thread> clientThreads = Collections.synchronizedSet(new HashSet<>());
+ final static AtomicBoolean stop = new AtomicBoolean();
+
+ /** Handles one client connection */
+ private static class ClientHandler extends Thread {
+
+ // We hold this just so we can close it to exit the process:
+ private final ServerSocket ss;
+ private final Socket socket;
+ private final Node node;
+ private final int bufferSize;
+
+ public ClientHandler(ServerSocket ss, Node node, Socket socket) {
+ this.ss = ss;
+ this.node = node;
+ this.socket = socket;
+ this.bufferSize = TestUtil.nextInt(random(), 128, 65536);
+ if (Node.VERBOSE_CONNECTIONS) {
+ node.message("new connection socket=" + socket);
+ }
+ }
+
+ @Override
+ public void run() {
+ boolean success = false;
+ try {
+ //node.message("using stream buffer size=" + bufferSize);
+ InputStream is = new BufferedInputStream(socket.getInputStream(), bufferSize);
+ DataInput in = new InputStreamDataInput(is);
+ BufferedOutputStream bos = new BufferedOutputStream(socket.getOutputStream(), bufferSize);
+ DataOutput out = new OutputStreamDataOutput(bos);
+
+ if (node instanceof SimplePrimaryNode) {
+ ((SimplePrimaryNode) node).handleOneConnection(random(), ss, stop, is, socket, in, out, bos);
+ } else {
+ ((SimpleReplicaNode) node).handleOneConnection(ss, stop, is, socket, in, out, bos);
+ }
+
+ bos.flush();
+ if (Node.VERBOSE_CONNECTIONS) {
+ node.message("bos.flush done");
+ }
+
+ success = true;
+ } catch (Throwable t) {
+ if (t instanceof SocketException == false) {
+ node.message("unexpected exception handling client connection:");
+ t.printStackTrace(System.out);
+ // Test should fail with this:
+ throw new RuntimeException(t);
+ } else {
+ node.message("SocketException " + t + " handling client connection; ignoring");
+ }
+ } finally {
+ if (success) {
+ try {
+ IOUtils.close(socket);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ } else {
+ IOUtils.closeWhileHandlingException(socket);
+ }
+ }
+ if (Node.VERBOSE_CONNECTIONS) {
+ node.message("socket.close done");
+ }
+ }
+ }
+
+ /**
+ * currently, this only works/tested on Sun and IBM.
+ */
+
+ // poached from TestIndexWriterOnJRECrash ... should we factor out to TestUtil? seems dangerous to give it such "publicity"?
+ private static void crashJRE() {
+ final String vendor = Constants.JAVA_VENDOR;
+ final boolean supportsUnsafeNpeDereference =
+ vendor.startsWith("Oracle") ||
+ vendor.startsWith("Sun") ||
+ vendor.startsWith("Apple");
+
+ try {
+ if (supportsUnsafeNpeDereference) {
+ try {
+ Class<?> clazz = Class.forName("sun.misc.Unsafe");
+ java.lang.reflect.Field field = clazz.getDeclaredField("theUnsafe");
+ field.setAccessible(true);
+ Object o = field.get(null);
+ Method m = clazz.getMethod("putAddress", long.class, long.class);
+ m.invoke(o, 0L, 0L);
+ } catch (Throwable e) {
+ System.out.println("Couldn't kill the JVM via Unsafe.");
+ e.printStackTrace(System.out);
+ }
+ }
+
+ // Fallback attempt to Runtime.halt();
+ Runtime.getRuntime().halt(-1);
+ } catch (Exception e) {
+ System.out.println("Couldn't kill the JVM.");
+ e.printStackTrace(System.out);
+ }
+
+ // We couldn't get the JVM to crash for some reason.
+ throw new RuntimeException("JVM refuses to die!");
+ }
+
+ static void writeFilesMetaData(DataOutput out, Map<String,FileMetaData> files) throws IOException {
+ out.writeVInt(files.size());
+ for(Map.Entry<String,FileMetaData> ent : files.entrySet()) {
+ out.writeString(ent.getKey());
+
+ FileMetaData fmd = ent.getValue();
+ out.writeVLong(fmd.length);
+ out.writeVLong(fmd.checksum);
+ out.writeVInt(fmd.header.length);
+ out.writeBytes(fmd.header, 0, fmd.header.length);
+ out.writeVInt(fmd.footer.length);
+ out.writeBytes(fmd.footer, 0, fmd.footer.length);
+ }
+ }
+
+ static Map<String,FileMetaData> readFilesMetaData(DataInput in) throws IOException {
+ int fileCount = in.readVInt();
+ //System.out.println("readFilesMetaData: fileCount=" + fileCount);
+ Map<String,FileMetaData> files = new HashMap<>();
+ for(int i=0;i<fileCount;i++) {
+ String fileName = in.readString();
+ //System.out.println("readFilesMetaData: fileName=" + fileName);
+ long length = in.readVLong();
+ long checksum = in.readVLong();
+ byte[] header = new byte[in.readVInt()];
+ in.readBytes(header, 0, header.length);
+ byte[] footer = new byte[in.readVInt()];
+ in.readBytes(footer, 0, footer.length);
+ files.put(fileName, new FileMetaData(header, footer, length, checksum));
+ }
+ return files;
+ }
+
+ /** Pulls CopyState off the wire */
+ static CopyState readCopyState(DataInput in) throws IOException {
+
+ // Decode a new CopyState
+ byte[] infosBytes = new byte[in.readVInt()];
+ in.readBytes(infosBytes, 0, infosBytes.length);
+
+ long gen = in.readVLong();
+ long version = in.readVLong();
+ Map<String,FileMetaData> files = readFilesMetaData(in);
+
+ int count = in.readVInt();
+ Set<String> completedMergeFiles = new HashSet<>();
+ for(int i=0;i<count;i++) {
+ completedMergeFiles.add(in.readString());
+ }
+ long primaryGen = in.readVLong();
+
+ return new CopyState(files, version, gen, infosBytes, completedMergeFiles, primaryGen, null);
+ }
+
+ public void test() throws Exception {
+
+ int id = Integer.parseInt(System.getProperty("tests.nrtreplication.nodeid"));
+ Thread.currentThread().setName("init child " + id);
+ Path indexPath = Paths.get(System.getProperty("tests.nrtreplication.indexpath"));
+ boolean isPrimary = System.getProperty("tests.nrtreplication.isPrimary") != null;
+ int primaryTCPPort;
+ long forcePrimaryVersion;
+ if (isPrimary == false) {
+ forcePrimaryVersion = -1;
+ primaryTCPPort = Integer.parseInt(System.getProperty("tests.nrtreplication.primaryTCPPort"));
+ } else {
+ primaryTCPPort = -1;
+ forcePrimaryVersion = Long.parseLong(System.getProperty("tests.nrtreplication.forcePrimaryVersion"));
+ }
+ long primaryGen = Long.parseLong(System.getProperty("tests.nrtreplication.primaryGen"));
+ Node.globalStartNS = Long.parseLong(System.getProperty("tests.nrtreplication.startNS"));
+
+ boolean doRandomCrash = isPrimary ? TestNRTReplication.DO_CRASH_PRIMARY : TestNRTReplication.DO_CRASH_REPLICA;
+ boolean doRandomClose = isPrimary ? false : TestNRTReplication.DO_CLOSE_REPLICA;
+
+ // Create server socket that we listen for incoming requests on:
+ try (final ServerSocket ss = new ServerSocket(0)) {
+
+ int tcpPort = ((InetSocketAddress) ss.getLocalSocketAddress()).getPort();
+ System.out.println("\nPORT: " + tcpPort);
+ final Node node;
+ if (isPrimary) {
+ node = new SimplePrimaryNode(random(), indexPath, id, tcpPort, primaryGen, forcePrimaryVersion, null);
+ System.out.println("\nCOMMIT VERSION: " + ((PrimaryNode) node).getLastCommitVersion());
+ } else {
+ node = new SimpleReplicaNode(random(), id, tcpPort, indexPath, primaryGen, primaryTCPPort, null);
+ }
+ System.out.println("\nINFOS VERSION: " + node.getCurrentSearchingVersion());
+
+ if (doRandomClose || doRandomCrash) {
+ final int waitForMS;
+ if (isPrimary) {
+ waitForMS = TestUtil.nextInt(random(), 20000, 60000);
+ } else {
+ waitForMS = TestUtil.nextInt(random(), 5000, 60000);
+ }
+
+ boolean doClose;
+ if (doRandomCrash == false) {
+ doClose = true;
+ } else if (doRandomClose) {
+ doClose = random().nextBoolean();
+ } else {
+ doClose = false;
+ }
+
+ if (doClose) {
+ node.message("top: will close after " + (waitForMS/1000.0) + " seconds");
+ } else {
+ node.message("top: will crash after " + (waitForMS/1000.0) + " seconds");
+ }
+
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ long endTime = System.nanoTime() + waitForMS*1000000L;
+ while (System.nanoTime() < endTime) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ if (stop.get()) {
+ break;
+ }
+ }
+
+ if (stop.get() == false) {
+ if (doClose) {
+ try {
+ node.message("top: now force close server socket after " + (waitForMS/1000.0) + " seconds");
+ node.state = "top-closing";
+ ss.close();
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ } else {
+ node.message("top: now crash JVM after " + (waitForMS/1000.0) + " seconds");
+ crashJRE();
+ }
+ }
+ }
+ };
+
+ if (isPrimary) {
+ t.setName("crasher P" + id);
+ } else {
+ t.setName("crasher R" + id);
+ }
+
+ // So that if node exits naturally, this thread won't prevent process exit:
+ t.setDaemon(true);
+ t.start();
+ }
+ System.out.println("\nNODE STARTED");
+
+ //List<Thread> clientThreads = new ArrayList<>();
+
+ // Naive thread-per-connection server:
+ while (true) {
+ Socket socket;
+ try {
+ socket = ss.accept();
+ } catch (SocketException se) {
+ // when ClientHandler closes our ss we will hit this
+ node.message("top: server socket exc; now exit");
+ break;
+ }
+ Thread thread = new ClientHandler(ss, node, socket);
+ thread.setDaemon(true);
+ thread.start();
+
+ clientThreads.add(thread);
+
+ // Prune finished client threads:
+ Iterator<Thread> it = clientThreads.iterator();
+ while (it.hasNext()) {
+ Thread t = it.next();
+ if (t.isAlive() == false) {
+ it.remove();
+ }
+ }
+ //node.message(clientThreads.size() + " client threads are still alive");
+ }
+
+ stop.set(true);
+
+ // Make sure all client threads are done, else we get annoying (yet ultimately "harmless") messages about threads still running /
+ // lingering for them to finish from the child processes:
+ for(Thread clientThread : clientThreads) {
+ node.message("top: join clientThread=" + clientThread);
+ clientThread.join();
+ node.message("top: done join clientThread=" + clientThread);
+ }
+ node.message("done join all client threads; now close node");
+ node.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
new file mode 100644
index 0000000..d409ffc
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
@@ -0,0 +1,250 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.store.ByteArrayDataInput;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.RAMOutputStream;
+
+/** This is a stupid yet functional transaction log: it never fsync's, never prunes, it's over-synchronized, it hard-wires id field name to "docid", can
+ * only handle specific docs/fields used by this test, etc. It's just barely enough to show how a translog could work on top of NRT
+ * replication to guarantee no data loss when nodes crash */
+
+class SimpleTransLog implements Closeable {
+
+ final FileChannel channel;
+ final RAMOutputStream buffer = new RAMOutputStream();
+ final byte[] intBuffer = new byte[4];
+ final ByteBuffer intByteBuffer = ByteBuffer.wrap(intBuffer);
+
+ private final static byte OP_ADD_DOCUMENT = (byte) 0;
+ private final static byte OP_UPDATE_DOCUMENT = (byte) 1;
+ private final static byte OP_DELETE_DOCUMENTS = (byte) 2;
+
+ public SimpleTransLog(Path path) throws IOException {
+ channel = FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+ }
+
+ public synchronized long getNextLocation() throws IOException {
+ return channel.position();
+ }
+
+ /** Appends an addDocument op */
+ public synchronized long addDocument(String id, Document doc) throws IOException {
+ assert buffer.getFilePointer() == 0;
+ buffer.writeByte(OP_ADD_DOCUMENT);
+ encode(id, doc);
+ return flushBuffer();
+ }
+
+ /** Appends an updateDocument op */
+ public synchronized long updateDocument(String id, Document doc) throws IOException {
+ assert buffer.getFilePointer() == 0;
+ buffer.writeByte(OP_UPDATE_DOCUMENT);
+ encode(id, doc);
+ return flushBuffer();
+ }
+
+ /** Appends a deleteDocuments op */
+ public synchronized long deleteDocuments(String id) throws IOException {
+ assert buffer.getFilePointer() == 0;
+ buffer.writeByte(OP_DELETE_DOCUMENTS);
+ buffer.writeString(id);
+ return flushBuffer();
+ }
+
+ /** Writes buffer to the file and returns the start position. */
+ private synchronized long flushBuffer() throws IOException {
+ long pos = channel.position();
+ int len = (int) buffer.getFilePointer();
+ byte[] bytes = new byte[len];
+ buffer.writeTo(bytes, 0);
+ buffer.reset();
+
+ intBuffer[0] = (byte) (len >> 24);
+ intBuffer[1] = (byte) (len >> 16);
+ intBuffer[2] = (byte) (len >> 8);
+ intBuffer[3] = (byte) len;
+ intByteBuffer.limit(4);
+ intByteBuffer.position(0);
+
+ writeBytesToChannel(intByteBuffer);
+ writeBytesToChannel(ByteBuffer.wrap(bytes));
+
+ return pos;
+ }
+
+ private void writeBytesToChannel(ByteBuffer src) throws IOException {
+ int left = src.limit();
+ while (left != 0) {
+ left -= channel.write(src);
+ }
+ }
+
+ private void readBytesFromChannel(long pos, ByteBuffer dest) throws IOException {
+ int left = dest.limit() - dest.position();
+ long end = pos + left;
+ while (pos < end) {
+ int inc = channel.read(dest, pos);
+ if (inc < 0) {
+ throw new EOFException();
+ }
+ pos += inc;
+ }
+ }
+
+ /** Replays ops between start and end location against the provided writer. Can run concurrently with ongoing operations. */
+ public void replay(NodeProcess primary, long start, long end) throws IOException {
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ byte[] intBuffer = new byte[4];
+ ByteBuffer intByteBuffer = ByteBuffer.wrap(intBuffer);
+ ByteArrayDataInput in = new ByteArrayDataInput();
+
+ long pos = start;
+ while (pos < end) {
+ intByteBuffer.position(0);
+ intByteBuffer.limit(4);
+ readBytesFromChannel(pos, intByteBuffer);
+ pos += 4;
+ int len = ((intBuffer[0] & 0xff) << 24) |
+ (intBuffer[1] & 0xff) << 16 |
+ (intBuffer[2] & 0xff) << 8 |
+ (intBuffer[3] & 0xff);
+
+ byte[] bytes = new byte[len];
+ readBytesFromChannel(pos, ByteBuffer.wrap(bytes));
+ pos += len;
+
+ in.reset(bytes);
+
+ byte op = in.readByte();
+ //System.out.println("xlog: replay op=" + op);
+ switch (op) {
+ case 0:
+ // We replay add as update:
+ replayAddDocument(c, primary, in);
+ break;
+
+ case 1:
+ // We replay add as update:
+ replayAddDocument(c, primary, in);
+ break;
+
+ case 2:
+ replayDeleteDocuments(c, primary, in);
+ break;
+
+ default:
+ throw new CorruptIndexException("invalid operation " + op, in);
+ }
+ }
+ assert pos == end;
+ //System.out.println("xlog: done replay");
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING_DONE);
+ c.flush();
+ //System.out.println("xlog: done flush");
+ c.in.readByte();
+ //System.out.println("xlog: done readByte");
+ }
+ }
+
+ private void replayAddDocument(Connection c, NodeProcess primary, DataInput in) throws IOException {
+ String id = in.readString();
+
+ Document doc = new Document();
+ doc.add(new StringField("docid", id, Field.Store.YES));
+
+ String title = readNullableString(in);
+ if (title != null) {
+ doc.add(new StringField("title", title, Field.Store.NO));
+ doc.add(new TextField("titleTokenized", title, Field.Store.NO));
+ }
+ String body = readNullableString(in);
+ if (body != null) {
+ doc.add(new TextField("body", body, Field.Store.NO));
+ }
+ String marker = readNullableString(in);
+ if (marker != null) {
+ //System.out.println("xlog: replay marker=" + id);
+ doc.add(new StringField("marker", marker, Field.Store.YES));
+ }
+
+ // For both add and update originally, we use updateDocument to replay,
+ // because the doc could in fact already be in the index:
+ // nocomit what if this fails?
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+
+
+ private void replayDeleteDocuments(Connection c, NodeProcess primary, DataInput in) throws IOException {
+ String id = in.readString();
+ // nocomit what if this fails?
+ primary.deleteDocument(c, id);
+ }
+
+ /** Encodes doc into buffer. NOTE: this is NOT general purpose! It only handles the fields used in this test! */
+ private synchronized void encode(String id, Document doc) throws IOException {
+ assert id.equals(doc.get("docid")): "id=" + id + " vs docid=" + doc.get("docid");
+ buffer.writeString(id);
+ writeNullableString(doc.get("title"));
+ writeNullableString(doc.get("body"));
+ writeNullableString(doc.get("marker"));
+ }
+
+ private synchronized void writeNullableString(String s) throws IOException {
+ if (s == null) {
+ buffer.writeByte((byte) 0);
+ } else {
+ buffer.writeByte((byte) 1);
+ buffer.writeString(s);
+ }
+ }
+
+ private String readNullableString(DataInput in) throws IOException {
+ byte b = in.readByte();
+ if (b == 0) {
+ return null;
+ } else if (b == 1) {
+ return in.readString();
+ } else {
+ throw new CorruptIndexException("invalid string lead byte " + b, in);
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ channel.close();
+ }
+}