You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/12/01 23:59:13 UTC

svn commit: r1209295 - in /lucene/dev/branches/solrcloud/solr/core/src: java/org/apache/solr/update/ test/org/apache/solr/cloud/

Author: yonik
Date: Thu Dec  1 22:59:12 2011
New Revision: 1209295

URL: http://svn.apache.org/viewvc?rev=1209295&view=rev
Log:
fix inconsistent synchronization

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1209295&r1=1209294&r2=1209295&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Thu Dec  1 22:59:12 2011
@@ -111,10 +111,11 @@ public class AddUpdateCommand extends Up
 
    @Override
   public String toString() {
-     StringBuilder sb = new StringBuilder(commandName);
-     sb.append(':');
-     if (indexedId !=null) sb.append("id=").append(indexedId);
+     StringBuilder sb = new StringBuilder(super.toString());
+     if (indexedId != null) sb.append(",id=").append(indexedId);
      if (!overwrite) sb.append(",overwrite=").append(overwrite);
+     if (commitWithin != -1) sb.append(",commitWithin=").append(commitWithin);
+     sb.append('}');
      return sb.toString();
    }
  }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java?rev=1209295&r1=1209294&r2=1209295&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java Thu Dec  1 22:59:12 2011
@@ -94,11 +94,13 @@ public class DeleteUpdateCommand extends
 
   @Override
   public String toString() {
-    StringBuilder sb = new StringBuilder(commandName);
-    sb.append(':');
-    if (id!=null) sb.append("id=").append(getId());
-    else sb.append("query=`").append(query).append('`');
-    return sb.toString();
+    StringBuilder sb = new StringBuilder(super.toString());
+    if (id!=null) sb.append(",id=").append(getId());
+    if (indexedId!=null) sb.append(",indexedId=").append(getId());
+    if (query != null) sb.append(",query=`").append(query).append('`');
+    // if (commitWithin != -1) sb.append(",commitWithin=").append(commitWithin);
+     sb.append('}');
+     return sb.toString();
   }
 
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java?rev=1209295&r1=1209294&r2=1209295&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java Thu Dec  1 22:59:12 2011
@@ -31,6 +31,8 @@ import org.apache.solr.update.processor.
 import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
 import org.apache.solr.update.processor.RunUpdateProcessorFactory;
 import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FilenameFilter;
@@ -122,6 +124,8 @@ class NullUpdateLog extends UpdateLog {
 
 /** @lucene.experimental */
 public class FSUpdateLog extends UpdateLog {
+  public static Logger log = LoggerFactory.getLogger(FSUpdateLog.class);
+  public boolean debug = log.isDebugEnabled();
 
   public static String TLOG_NAME="tlog";
 
@@ -247,7 +251,9 @@ public class FSUpdateLog extends UpdateL
         map.put(cmd.getIndexedId(), ptr);
       }
 
-      // SolrCore.verbose("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+      if (debug) {
+        log.debug("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+      }
     }
   }
 
@@ -273,7 +279,9 @@ public class FSUpdateLog extends UpdateL
         oldDeletes.put(br, ptr);
       }
 
-      // SolrCore.verbose("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+      if (debug) {
+        log.debug("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+      }
     }
   }
 
@@ -293,7 +301,10 @@ public class FSUpdateLog extends UpdateL
       }
 
       LogPtr ptr = new LogPtr(pos, cmd.getVersion());
-      // SolrCore.verbose("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+
+      if (debug) {
+        log.debug("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+      }
     }
   }
 
@@ -316,6 +327,11 @@ public class FSUpdateLog extends UpdateL
   @Override
   public void preCommit(CommitUpdateCommand cmd) {
     synchronized (this) {
+      if (debug) {
+        log.debug("TLOG: preCommit");
+      }
+
+
       // since we're changing the log, we must change the map.
       newMap();
 
@@ -335,6 +351,9 @@ public class FSUpdateLog extends UpdateL
   @Override
   public void postCommit(CommitUpdateCommand cmd) {
     synchronized (this) {
+      if (debug) {
+        log.debug("TLOG: postCommit");
+      }
       if (prevTlog != null) {
         prevTlog.decref();
         prevTlog = null;
@@ -344,7 +363,10 @@ public class FSUpdateLog extends UpdateL
 
   @Override
   public void preSoftCommit(CommitUpdateCommand cmd) {
+    debug = log.isDebugEnabled(); // refresh our view of debugging occasionally
+
     synchronized (this) {
+
       if (!cmd.softCommit) return;  // already handled this at the start of the hard commit
       newMap();
 
@@ -353,7 +375,10 @@ public class FSUpdateLog extends UpdateL
       // But we do know that any updates already added will definitely
       // show up in the latest reader after the commit succeeds.
       map = new HashMap<BytesRef, LogPtr>();
-      // SolrCore.verbose("TLOG: preSoftCommit: prevMap="+ System.identityHashCode(prevMap) + " new map=" + System.identityHashCode(map));
+
+      if (debug) {
+        log.debug("TLOG: preSoftCommit: prevMap="+ System.identityHashCode(prevMap) + " new map=" + System.identityHashCode(map));
+      }
     }
   }
 
@@ -365,8 +390,11 @@ public class FSUpdateLog extends UpdateL
       // it being called in the middle of a preSoftCommit, postSoftCommit sequence.
       // If this DUH2 synchronization were to be removed, preSoftCommit should
       // record what old maps were created and only remove those.
+
+      if (debug) {
+        SolrCore.verbose("TLOG: postSoftCommit: disposing of prevMap="+ System.identityHashCode(prevMap) + ", prevMap2=" + System.identityHashCode(prevMap2));
+      }
       clearOldMaps();
-      // SolrCore.verbose("TLOG: postSoftCommit: disposing of prevMap="+ System.identityHashCode(prevMap));
     }
   }
 
@@ -532,15 +560,22 @@ public class FSUpdateLog extends UpdateL
     assert state == State.ACTIVE;
     recoveryInfo = new RecoveryInfo();
 
+    // TODO: currently we don't keep track of where we are in an existing
+    // transaction log (if there have already been updates) and
+    // we start at the beginning when we replay.
+
     // block all updates to eliminate race conditions
     // reading state and acting on it in the update processor
     versionInfo.blockUpdates();
     try {
+      if (log.isInfoEnabled()) {
+        log.info("Starting to buffer updates. " + this);
+      }
+
       state = State.BUFFERING;
     } finally {
       versionInfo.unblockUpdates();
     }
-    recoveryInfo = new RecoveryInfo();
   }
 
   /** Returns the Future to wait on, or null if no replay was needed */
@@ -580,6 +615,11 @@ public class FSUpdateLog extends UpdateL
     return state;
   }
 
+  @Override
+  public String toString() {
+    return "FSUpdateLog{state="+getState()+", tlog="+tlog+"}";
+  }
+
 
   public static Runnable testing_logReplayHook;  // called before each log read
   public static Runnable testing_logReplayFinishHook;  // called when log replay has finished

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1209295&r1=1209294&r2=1209295&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java Thu Dec  1 22:59:12 2011
@@ -24,6 +24,8 @@ import org.apache.solr.common.util.FastI
 import org.apache.solr.common.util.FastOutputStream;
 import org.apache.solr.common.util.JavaBinCodec;
 import org.apache.zookeeper.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.*;
 import java.nio.ByteBuffer;
@@ -51,6 +53,7 @@ import java.util.concurrent.atomic.Atomi
  *
  */
 public class TransactionLog {
+  public static Logger log = LoggerFactory.getLogger(TransactionLog.class);
 
   public final static String END_MESSAGE="SOLR_TLOG_END";
 
@@ -59,13 +62,14 @@ public class TransactionLog {
   RandomAccessFile raf;
   FileChannel channel;
   OutputStream os;
-  FastOutputStream fos;    // all accesses to this stream should be synchronized on "this"
+  FastOutputStream fos;    // all accesses to this stream should be synchronized on "this" (The TransactionLog)
 
   volatile boolean deleteOnClose = true;  // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
 
   AtomicInteger refcount = new AtomicInteger(1);
   Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
   List<String> globalStringList = new ArrayList<String>();
+  final boolean debug = log.isDebugEnabled();
 
   // write a BytesRef as a byte array
   JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
@@ -136,6 +140,10 @@ public class TransactionLog {
 
   TransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) {
     try {
+      if (debug) {
+        log.debug("New TransactionLog file=" + tlogFile + ", exists=" + tlogFile.exists() + ", size="+tlogFile.length() + ", openExisting=" + openExisting);
+      }
+
       this.tlogFile = tlogFile;
       raf = new RandomAccessFile(this.tlogFile, "rw");
       long start = raf.length();
@@ -195,7 +203,7 @@ public class TransactionLog {
   }
 
   Collection<String> getGlobalStrings() {
-    synchronized (fos) {
+    synchronized (this) {
       return new ArrayList<String>(globalStringList);
     }
   }
@@ -210,9 +218,10 @@ public class TransactionLog {
 
   public long write(AddUpdateCommand cmd) {
     LogCodec codec = new LogCodec();
-    synchronized (fos) {
+    long pos = 0;
+    synchronized (this) {
       try {
-        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+        pos = fos.size();   // if we had flushed, this should be equal to channel.position()
         SolrInputDocument sdoc = cmd.getSolrInputDocument();
 
         if (pos == 0) { // TODO: needs to be changed if we start writing a header first
@@ -238,14 +247,15 @@ public class TransactionLog {
 
         return pos;
       } catch (IOException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+        // TODO: reset our file pointer back to "pos", the start of this record.
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
       }
     }
   }
 
   public long writeDelete(DeleteUpdateCommand cmd) {
     LogCodec codec = new LogCodec();
-    synchronized (fos) {
+    synchronized (this) {
       try {
         long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
         if (pos == 0) {
@@ -268,7 +278,7 @@ public class TransactionLog {
 
   public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
     LogCodec codec = new LogCodec();
-    synchronized (fos) {
+    synchronized (this) {
       try {
         long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
         if (pos == 0) {
@@ -291,7 +301,7 @@ public class TransactionLog {
 
   public long writeCommit(CommitUpdateCommand cmd) {
     LogCodec codec = new LogCodec();
-    synchronized (fos) {
+    synchronized (this) {
       try {
         long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
         if (pos == 0) {
@@ -320,7 +330,7 @@ public class TransactionLog {
 
     try {
       // make sure any unflushed buffer has been flushed
-      synchronized (fos) {
+      synchronized (this) {
         // TODO: optimize this by keeping track of what we have flushed up to
         fos.flushBuffer();
         /***
@@ -355,7 +365,7 @@ public class TransactionLog {
   public void finish(UpdateLog.SyncLevel syncLevel) {
     if (syncLevel == UpdateLog.SyncLevel.NONE) return;
     try {
-      synchronized (fos) {
+      synchronized (this) {
         fos.flushBuffer();
       }
 
@@ -373,6 +383,10 @@ public class TransactionLog {
 
   private void close() {
     try {
+      if (debug) {
+        log.debug("Closing " + this);
+      }
+
       fos.flush();
       fos.close();
       if (deleteOnClose) {
@@ -388,7 +402,7 @@ public class TransactionLog {
   }
 
   /** Returns a reader that can be used while a log is still in use.
-   * Currently only *one* log may be outstanding, and that log may only
+   * Currently only *one* LogReader may be outstanding, and that log may only
    * be used from a single thread. */
   public LogReader getReader() {
     return new LogReader();
@@ -412,7 +426,12 @@ public class TransactionLog {
     public Object next() throws IOException, InterruptedException {
       long pos = fis.position();
 
+
       synchronized (TransactionLog.this) {
+        if (debug) {
+          log.debug("Reading log record.  pos="+pos+" currentSize="+fos.size());
+        }
+
         if (pos >= fos.size()) {
           return null;
         }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1209295&r1=1209294&r2=1209295&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Thu Dec  1 22:59:12 2011
@@ -42,7 +42,7 @@ public class UpdateCommand implements Cl
 
   @Override
   public String toString() {
-    return commandName;
+    return commandName + ":{flags="+flags+", version="+version;
   }
 
   public long getVersion() {

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java?rev=1209295&r1=1209294&r2=1209295&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java Thu Dec  1 22:59:12 2011
@@ -35,7 +35,6 @@ import org.junit.Ignore;
 /**
  *
  */
-@Ignore("this test still fails sometimes - it seems usually due to replay failing")
 public class RecoveryZkTest extends FullDistributedZkTest {