You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/10/07 04:04:05 UTC

svn commit: r1179922 - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/core/ core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/update/processor/ solrj/src/java/org/apache/solr/common/util/

Author: yonik
Date: Fri Oct  7 02:04:04 2011
New Revision: 1179922

URL: http://svn.apache.org/viewvc?rev=1179922&view=rev
Log:
SOLR-2816: start versioning

Added:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java   (with props)
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java   (with props)
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java   (with props)
Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/Hash.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1179922&r1=1179921&r2=1179922&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java Fri Oct  7 02:04:04 2011
@@ -42,10 +42,7 @@ import org.apache.solr.search.ValueSourc
 import org.apache.solr.update.DirectUpdateHandler2;
 import org.apache.solr.update.SolrIndexWriter;
 import org.apache.solr.update.UpdateHandler;
-import org.apache.solr.update.processor.LogUpdateProcessorFactory;
-import org.apache.solr.update.processor.RunUpdateProcessorFactory;
-import org.apache.solr.update.processor.UpdateRequestProcessorChain;
-import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
+import org.apache.solr.update.processor.*;
 import org.apache.solr.util.RefCounted;
 import org.apache.solr.util.plugin.NamedListInitializedPlugin;
 import org.apache.solr.util.plugin.SolrCoreAware;
@@ -661,6 +658,7 @@ public final class SolrCore implements S
       // construct the default chain
       UpdateRequestProcessorFactory[] factories = new UpdateRequestProcessorFactory[]{
               new LogUpdateProcessorFactory(),
+              new VersionProcessorFactory(),
               new RunUpdateProcessorFactory()
       };
       def = new UpdateRequestProcessorChain(factories, this);

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java?rev=1179922&r1=1179921&r2=1179922&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java Fri Oct  7 02:04:04 2011
@@ -17,6 +17,7 @@
 
 package org.apache.solr.update;
 
+import org.apache.lucene.util.BitUtil;
 import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
@@ -26,6 +27,7 @@ import org.apache.solr.common.util.JavaB
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.PluginInfo;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.schema.SchemaField;
 
 import java.io.*;
 import java.nio.ByteBuffer;
@@ -33,6 +35,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.FileChannel;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /** @lucene.experimental */
 class NullUpdateLog extends UpdateLog {
@@ -80,6 +83,11 @@ class NullUpdateLog extends UpdateLog {
   @Override
   public void close() {
   }
+
+  @Override
+  public VersionInfo getVersionInfo() {
+    return null;
+  }
 }
 
 /** @lucene.experimental */
@@ -105,6 +113,12 @@ public class FSUpdateLog extends UpdateL
   private String dataDir;
   private String lastDataDir;
 
+  private VersionInfo versionInfo;
+  @Override
+  public VersionInfo getVersionInfo() {
+    return versionInfo;
+  }
+
   @Override
   public void init(PluginInfo info) {
     dataDir = (String)info.initArgs.get("dir");
@@ -124,6 +138,8 @@ public class FSUpdateLog extends UpdateL
     tlogDir.mkdirs();
     tlogFiles = getLogList(tlogDir);
     id = getLastLogId() + 1;   // add 1 since we will create a new log for the next update
+
+    versionInfo = new VersionInfo(uhandler, 64);
   }
 
   static class LogPtr {

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1179922&r1=1179921&r2=1179922&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Fri Oct  7 02:04:04 2011
@@ -27,6 +27,7 @@ import org.apache.solr.request.SolrQuery
   public class UpdateCommand {
     protected final SolrQueryRequest req;
     protected final String commandName;
+    protected long version;
 
     public UpdateCommand(String commandName, SolrQueryRequest req) {
       this.req = req;
@@ -37,4 +38,11 @@ import org.apache.solr.request.SolrQuery
     public String toString() {
       return commandName;
     }
+
+    public long getVersion() {
+      return version;
+    }
+    public void setVersion(long version) {
+      this.version = version;
+    }
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1179922&r1=1179921&r2=1179922&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Fri Oct  7 02:04:04 2011
@@ -37,4 +37,5 @@ public abstract class UpdateLog implemen
   public abstract void postSoftCommit(CommitUpdateCommand cmd);
   public abstract Object lookup(BytesRef indexedId);
   public abstract void close();
+  public abstract VersionInfo getVersionInfo();
 }

Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java?rev=1179922&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java Fri Oct  7 02:04:04 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+// TODO: make inner?
+// TODO: store the highest possible in the index on a commit (but how to not block adds?)
+// TODO: could also store highest possible in the transaction log after a commit.
+// Or on a new index, just scan "version" for the max?
+public class VersionBucket {
+  long highest;
+}

Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java?rev=1179922&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java Fri Oct  7 02:04:04 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import org.apache.lucene.util.BitUtil;
+import org.apache.solr.schema.SchemaField;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class VersionInfo {
+  public static final String VERSION_FIELD="_version_";
+
+  private final VersionBucket[] buckets;
+  private SchemaField versionField;
+
+  public VersionInfo(UpdateHandler updateHandler, int nBuckets) {
+    versionField = updateHandler.core.getSchema().getFieldOrNull("_version_");
+    buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
+    for (int i=0; i<buckets.length; i++) {
+      buckets[i] = new VersionBucket();
+    }
+  }
+
+  public SchemaField getVersionField() {
+    return versionField;
+  }
+
+
+  // todo: initialize... use current time to start?
+  // a clock that increments by 1 for every operation makes it easier to detect missing
+  // messages, but raises other issues:
+  // - need to initialize to largest thing in index or tlog
+  // - when becoming leader, need to make sure it's greater than
+  // - using to detect missing messages means we need to keep track per-leader, or make
+  //   sure a new leader starts off with 1 greater than the last leader.
+  private final AtomicLong clock = new AtomicLong();
+
+  public long getNewClock() {
+    return clock.incrementAndGet();
+  }
+
+  // Named *old* to prevent accidental calling getClock and expecting a new updated clock.
+  public long getOldClock() {
+    return clock.get();
+  }
+
+  /***
+  // Time-based lamport clock.  Good for introducing some reality into clocks (to the degree
+  // that times are somewhat synchronized in the cluster).
+  // Good if we want to relax some constraints to scale down to where only one node may be
+  // up at a time.  Harder to detect missing messages (because versions are not contiguous.
+  long vclock;
+  long time;
+  private final Object clockSync = new Object();
+
+
+  public long getNewClock() {
+    synchronized (clockSync) {
+      time = System.currentTimeMillis();
+      long result = time << 20;
+      if (result <= vclock) {
+        result = vclock + 1;
+      }
+      vclock = result;
+      return vclock;
+    }
+  }
+
+  public long getOldClock() {
+    synchronized (clockSync) {
+      return vclock;
+    }
+  }
+
+  public void updateClock(long clock) {
+    synchronized (clockSync) {
+      vclock = Math.max(vclock, clock);
+    }
+  }
+  ***/
+
+  public VersionBucket bucket(int hash) {
+    // If this is a user provided hash, it may be poor in the right-hand bits.
+    // Make sure high bits are moved down, since only the low bits will matter.
+    // int h = hash + (hash >>> 8) + (hash >>> 16) + (hash >>> 24);
+    // Assume good hash codes for now.
+
+    int slot = hash & (buckets.length-1);
+    return buckets[slot];
+  }
+
+}

Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java?rev=1179922&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java Fri Oct  7 02:04:04 2011
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.util.Hash;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.*;
+
+
+/**
+ * Pass the command to the UpdateHandler without any modifications
+ * 
+ * @since solr 1.3
+ */
+public class VersionProcessorFactory extends UpdateRequestProcessorFactory
+{
+  @Override
+  public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) 
+  {
+    // TODO: return null if there is not a unique id defined?
+    return new VersionProcessor(req, next);
+  }
+}
+// this is a separate class from DistribUpdateProcessor only to facilitate
+// working on that in parallel.  Given the dependencies, it will most likely make more sense for it to be merged.
+// For example, if not leader, forward to leader.  if leader, determine version, then send to replicas
+class VersionProcessor extends UpdateRequestProcessor
+{
+  private final SolrQueryRequest req;
+  private final UpdateHandler updateHandler;
+  private final UpdateLog ulog;
+  private final VersionInfo vinfo;
+  private final boolean versionsStored;
+
+  public VersionProcessor(SolrQueryRequest req, UpdateRequestProcessor next) {
+    super( next );
+    this.req = req;
+    this.updateHandler = req.getCore().getUpdateHandler();
+    this.ulog = updateHandler.getUpdateLog();
+    this.vinfo = ulog.getVersionInfo();
+    versionsStored = this.vinfo != null && this.vinfo.getVersionField() != null;
+  }
+
+  // TODO: move this to AddUpdateCommand/DeleteUpdateCommand and cache it? And make the hash pluggable of course.
+  // The hash also needs to be pluggable
+  private int hash(AddUpdateCommand cmd) {
+    BytesRef br = cmd.getIndexedId();
+    return Hash.murmurhash3_x86_32(br.bytes, br.offset, br.length, 0);
+  }
+  private int hash(DeleteUpdateCommand cmd) {
+    BytesRef br = cmd.getIndexedId();
+    return Hash.murmurhash3_x86_32(br.bytes, br.offset, br.length, 0);
+  }
+
+  @Override
+  public void processAdd(AddUpdateCommand cmd) throws IOException {
+    if (vinfo == null) {
+      super.processAdd(cmd);
+      return;
+    }
+
+    VersionBucket bucket = vinfo.bucket(hash(cmd));
+    synchronized (bucket) {
+      // we obtain the version when synchronized and then do the add so we can ensure that
+      // if version1 < version2 then version1 is actually added before version2.
+
+      // even if we don't store the version field, synchronizing on the bucket
+      // will enable us to know what version happened first, and thus enable
+      // realtime-get to work reliably.
+      // TODO: if verisons aren't stored, do we need to set on the cmd anyway for some reason?
+      // there may be other reasons in the future for a version on the commands
+      if (versionsStored) {
+        long version = vinfo.getNewClock();
+        cmd.setVersion(version);
+        cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
+      }
+
+      super.processAdd(cmd);
+    }
+  }
+
+  @Override
+  public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+    if (vinfo == null) {
+      super.processDelete(cmd);
+      return;
+    }
+
+    if (cmd.id == null) {
+      // delete-by-query
+      // TODO: forward to all nodes in distrib mode?  or just don't bother to support?
+      super.processDelete(cmd);
+      return;
+    }
+
+    VersionBucket bucket = vinfo.bucket(hash(cmd));
+    synchronized (bucket) {
+      if (versionsStored) {
+        long version = vinfo.getNewClock();
+        cmd.setVersion(version);
+      }
+      super.processDelete(cmd);
+    }
+  }
+
+  @Override
+  public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
+    super.processMergeIndexes(cmd);
+  }
+
+  @Override
+  public void processCommit(CommitUpdateCommand cmd) throws IOException
+  {
+    super.processCommit(cmd);
+  }
+
+  /**
+   * @since Solr 1.4
+   */
+  @Override
+  public void processRollback(RollbackUpdateCommand cmd) throws IOException
+  {
+    super.processRollback(cmd);
+  }
+}
+
+

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/Hash.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/Hash.java?rev=1179922&r1=1179921&r2=1179922&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/Hash.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/Hash.java Fri Oct  7 02:04:04 2011
@@ -239,4 +239,59 @@ public class Hash {
     return c + (((long)b) << 32);
   }
 
+
+  /** Returns the MurmurHash3_x86_32 hash.
+   * Original source/tests at https://github.com/yonik/java_util/
+   */
+  public static int murmurhash3_x86_32(byte[] data, int offset, int len, int seed) {
+
+    final int c1 = 0xcc9e2d51;
+    final int c2 = 0x1b873593;
+
+    int h1 = seed;
+    int roundedEnd = offset + (len & 0xfffffffc);  // round down to 4 byte block
+
+    for (int i=offset; i<roundedEnd; i+=4) {
+      // little endian load order
+      int k1 = (data[i] & 0xff) | ((data[i+1] & 0xff) << 8) | ((data[i+2] & 0xff) << 16) | (data[i+3] << 24);
+      k1 *= c1;
+      k1 = (k1 << 15) | (k1 >>> 17);  // ROTL32(k1,15);
+      k1 *= c2;
+
+      h1 ^= k1;
+      h1 = (h1 << 13) | (h1 >>> 19);  // ROTL32(h1,13);
+      h1 = h1*5+0xe6546b64;
+    }
+
+    // tail
+    int k1 = 0;
+
+    switch(len & 0x03) {
+      case 3:
+        k1 = (data[roundedEnd + 2] & 0xff) << 16;
+        // fallthrough
+      case 2:
+        k1 |= (data[roundedEnd + 1] & 0xff) << 8;
+        // fallthrough
+      case 1:
+        k1 |= (data[roundedEnd] & 0xff);
+        k1 *= c1;
+        k1 = (k1 << 15) | (k1 >>> 17);  // ROTL32(k1,15);
+        k1 *= c2;
+        h1 ^= k1;
+    }
+
+    // finalization
+    h1 ^= len;
+
+    // fmix(h1);
+    h1 ^= h1 >>> 16;
+    h1 *= 0x85ebca6b;
+    h1 ^= h1 >>> 13;
+    h1 *= 0xc2b2ae35;
+    h1 ^= h1 >>> 16;
+
+    return h1;
+  }
+
 }