You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/12/01 23:59:13 UTC
svn commit: r1209295 - in /lucene/dev/branches/solrcloud/solr/core/src:
java/org/apache/solr/update/ test/org/apache/solr/cloud/
Author: yonik
Date: Thu Dec 1 22:59:12 2011
New Revision: 1209295
URL: http://svn.apache.org/viewvc?rev=1209295&view=rev
Log:
fix inconsistent synchronization
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1209295&r1=1209294&r2=1209295&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Thu Dec 1 22:59:12 2011
@@ -111,10 +111,11 @@ public class AddUpdateCommand extends Up
@Override
public String toString() {
- StringBuilder sb = new StringBuilder(commandName);
- sb.append(':');
- if (indexedId !=null) sb.append("id=").append(indexedId);
+ StringBuilder sb = new StringBuilder(super.toString());
+ if (indexedId != null) sb.append(",id=").append(indexedId);
if (!overwrite) sb.append(",overwrite=").append(overwrite);
+ if (commitWithin != -1) sb.append(",commitWithin=").append(commitWithin);
+ sb.append('}');
return sb.toString();
}
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java?rev=1209295&r1=1209294&r2=1209295&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java Thu Dec 1 22:59:12 2011
@@ -94,11 +94,13 @@ public class DeleteUpdateCommand extends
@Override
public String toString() {
- StringBuilder sb = new StringBuilder(commandName);
- sb.append(':');
- if (id!=null) sb.append("id=").append(getId());
- else sb.append("query=`").append(query).append('`');
- return sb.toString();
+ StringBuilder sb = new StringBuilder(super.toString());
+ if (id!=null) sb.append(",id=").append(getId());
+ if (indexedId!=null) sb.append(",indexedId=").append(getId());
+ if (query != null) sb.append(",query=`").append(query).append('`');
+ // if (commitWithin != -1) sb.append(",commitWithin=").append(commitWithin);
+ sb.append('}');
+ return sb.toString();
}
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java?rev=1209295&r1=1209294&r2=1209295&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java Thu Dec 1 22:59:12 2011
@@ -31,6 +31,8 @@ import org.apache.solr.update.processor.
import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
import org.apache.solr.update.processor.RunUpdateProcessorFactory;
import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FilenameFilter;
@@ -122,6 +124,8 @@ class NullUpdateLog extends UpdateLog {
/** @lucene.experimental */
public class FSUpdateLog extends UpdateLog {
+ public static Logger log = LoggerFactory.getLogger(FSUpdateLog.class);
+ public boolean debug = log.isDebugEnabled();
public static String TLOG_NAME="tlog";
@@ -247,7 +251,9 @@ public class FSUpdateLog extends UpdateL
map.put(cmd.getIndexedId(), ptr);
}
- // SolrCore.verbose("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ if (debug) {
+ log.debug("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ }
}
}
@@ -273,7 +279,9 @@ public class FSUpdateLog extends UpdateL
oldDeletes.put(br, ptr);
}
- // SolrCore.verbose("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ if (debug) {
+ log.debug("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ }
}
}
@@ -293,7 +301,10 @@ public class FSUpdateLog extends UpdateL
}
LogPtr ptr = new LogPtr(pos, cmd.getVersion());
- // SolrCore.verbose("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+
+ if (debug) {
+ log.debug("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ }
}
}
@@ -316,6 +327,11 @@ public class FSUpdateLog extends UpdateL
@Override
public void preCommit(CommitUpdateCommand cmd) {
synchronized (this) {
+ if (debug) {
+ log.debug("TLOG: preCommit");
+ }
+
+
// since we're changing the log, we must change the map.
newMap();
@@ -335,6 +351,9 @@ public class FSUpdateLog extends UpdateL
@Override
public void postCommit(CommitUpdateCommand cmd) {
synchronized (this) {
+ if (debug) {
+ log.debug("TLOG: postCommit");
+ }
if (prevTlog != null) {
prevTlog.decref();
prevTlog = null;
@@ -344,7 +363,10 @@ public class FSUpdateLog extends UpdateL
@Override
public void preSoftCommit(CommitUpdateCommand cmd) {
+ debug = log.isDebugEnabled(); // refresh our view of debugging occasionally
+
synchronized (this) {
+
if (!cmd.softCommit) return; // already handled this at the start of the hard commit
newMap();
@@ -353,7 +375,10 @@ public class FSUpdateLog extends UpdateL
// But we do know that any updates already added will definitely
// show up in the latest reader after the commit succeeds.
map = new HashMap<BytesRef, LogPtr>();
- // SolrCore.verbose("TLOG: preSoftCommit: prevMap="+ System.identityHashCode(prevMap) + " new map=" + System.identityHashCode(map));
+
+ if (debug) {
+ log.debug("TLOG: preSoftCommit: prevMap="+ System.identityHashCode(prevMap) + " new map=" + System.identityHashCode(map));
+ }
}
}
@@ -365,8 +390,11 @@ public class FSUpdateLog extends UpdateL
// it being called in the middle of a preSoftCommit, postSoftCommit sequence.
// If this DUH2 synchronization were to be removed, preSoftCommit should
// record what old maps were created and only remove those.
+
+ if (debug) {
+ SolrCore.verbose("TLOG: postSoftCommit: disposing of prevMap="+ System.identityHashCode(prevMap) + ", prevMap2=" + System.identityHashCode(prevMap2));
+ }
clearOldMaps();
- // SolrCore.verbose("TLOG: postSoftCommit: disposing of prevMap="+ System.identityHashCode(prevMap));
}
}
@@ -532,15 +560,22 @@ public class FSUpdateLog extends UpdateL
assert state == State.ACTIVE;
recoveryInfo = new RecoveryInfo();
+ // TODO: currently we don't keep track of where we are in an existing
+ // transaction log (if there have already been updates) and
+ // we start at the beginning when we replay.
+
// block all updates to eliminate race conditions
// reading state and acting on it in the update processor
versionInfo.blockUpdates();
try {
+ if (log.isInfoEnabled()) {
+ log.info("Starting to buffer updates. " + this);
+ }
+
state = State.BUFFERING;
} finally {
versionInfo.unblockUpdates();
}
- recoveryInfo = new RecoveryInfo();
}
/** Returns the Future to wait on, or null if no replay was needed */
@@ -580,6 +615,11 @@ public class FSUpdateLog extends UpdateL
return state;
}
+ @Override
+ public String toString() {
+ return "FSUpdateLog{state="+getState()+", tlog="+tlog+"}";
+ }
+
public static Runnable testing_logReplayHook; // called before each log read
public static Runnable testing_logReplayFinishHook; // called when log replay has finished
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1209295&r1=1209294&r2=1209295&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java Thu Dec 1 22:59:12 2011
@@ -24,6 +24,8 @@ import org.apache.solr.common.util.FastI
import org.apache.solr.common.util.FastOutputStream;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.zookeeper.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.ByteBuffer;
@@ -51,6 +53,7 @@ import java.util.concurrent.atomic.Atomi
*
*/
public class TransactionLog {
+ public static Logger log = LoggerFactory.getLogger(TransactionLog.class);
public final static String END_MESSAGE="SOLR_TLOG_END";
@@ -59,13 +62,14 @@ public class TransactionLog {
RandomAccessFile raf;
FileChannel channel;
OutputStream os;
- FastOutputStream fos; // all accesses to this stream should be synchronized on "this"
+ FastOutputStream fos; // all accesses to this stream should be synchronized on "this" (The TransactionLog)
volatile boolean deleteOnClose = true; // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
AtomicInteger refcount = new AtomicInteger(1);
Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
List<String> globalStringList = new ArrayList<String>();
+ final boolean debug = log.isDebugEnabled();
// write a BytesRef as a byte array
JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
@@ -136,6 +140,10 @@ public class TransactionLog {
TransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) {
try {
+ if (debug) {
+ log.debug("New TransactionLog file=" + tlogFile + ", exists=" + tlogFile.exists() + ", size="+tlogFile.length() + ", openExisting=" + openExisting);
+ }
+
this.tlogFile = tlogFile;
raf = new RandomAccessFile(this.tlogFile, "rw");
long start = raf.length();
@@ -195,7 +203,7 @@ public class TransactionLog {
}
Collection<String> getGlobalStrings() {
- synchronized (fos) {
+ synchronized (this) {
return new ArrayList<String>(globalStringList);
}
}
@@ -210,9 +218,10 @@ public class TransactionLog {
public long write(AddUpdateCommand cmd) {
LogCodec codec = new LogCodec();
- synchronized (fos) {
+ long pos = 0;
+ synchronized (this) {
try {
- long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+ pos = fos.size(); // if we had flushed, this should be equal to channel.position()
SolrInputDocument sdoc = cmd.getSolrInputDocument();
if (pos == 0) { // TODO: needs to be changed if we start writing a header first
@@ -238,14 +247,15 @@ public class TransactionLog {
return pos;
} catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ // TODO: reset our file pointer back to "pos", the start of this record.
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
}
}
}
public long writeDelete(DeleteUpdateCommand cmd) {
LogCodec codec = new LogCodec();
- synchronized (fos) {
+ synchronized (this) {
try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
if (pos == 0) {
@@ -268,7 +278,7 @@ public class TransactionLog {
public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
LogCodec codec = new LogCodec();
- synchronized (fos) {
+ synchronized (this) {
try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
if (pos == 0) {
@@ -291,7 +301,7 @@ public class TransactionLog {
public long writeCommit(CommitUpdateCommand cmd) {
LogCodec codec = new LogCodec();
- synchronized (fos) {
+ synchronized (this) {
try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
if (pos == 0) {
@@ -320,7 +330,7 @@ public class TransactionLog {
try {
// make sure any unflushed buffer has been flushed
- synchronized (fos) {
+ synchronized (this) {
// TODO: optimize this by keeping track of what we have flushed up to
fos.flushBuffer();
/***
@@ -355,7 +365,7 @@ public class TransactionLog {
public void finish(UpdateLog.SyncLevel syncLevel) {
if (syncLevel == UpdateLog.SyncLevel.NONE) return;
try {
- synchronized (fos) {
+ synchronized (this) {
fos.flushBuffer();
}
@@ -373,6 +383,10 @@ public class TransactionLog {
private void close() {
try {
+ if (debug) {
+ log.debug("Closing " + this);
+ }
+
fos.flush();
fos.close();
if (deleteOnClose) {
@@ -388,7 +402,7 @@ public class TransactionLog {
}
/** Returns a reader that can be used while a log is still in use.
- * Currently only *one* log may be outstanding, and that log may only
+ * Currently only *one* LogReader may be outstanding, and that log may only
* be used from a single thread. */
public LogReader getReader() {
return new LogReader();
@@ -412,7 +426,12 @@ public class TransactionLog {
public Object next() throws IOException, InterruptedException {
long pos = fis.position();
+
synchronized (TransactionLog.this) {
+ if (debug) {
+ log.debug("Reading log record. pos="+pos+" currentSize="+fos.size());
+ }
+
if (pos >= fos.size()) {
return null;
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1209295&r1=1209294&r2=1209295&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Thu Dec 1 22:59:12 2011
@@ -42,7 +42,7 @@ public class UpdateCommand implements Cl
@Override
public String toString() {
- return commandName;
+ return commandName + ":{flags="+flags+", version="+version;
}
public long getVersion() {
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java?rev=1209295&r1=1209294&r2=1209295&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java Thu Dec 1 22:59:12 2011
@@ -35,7 +35,6 @@ import org.junit.Ignore;
/**
*
*/
-@Ignore("this test still fails sometimes - it seems usually due to replay failing")
public class RecoveryZkTest extends FullDistributedZkTest {