You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/10/07 04:04:05 UTC
svn commit: r1179922 - in /lucene/dev/branches/solrcloud/solr:
core/src/java/org/apache/solr/core/ core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/processor/
solrj/src/java/org/apache/solr/common/util/
Author: yonik
Date: Fri Oct 7 02:04:04 2011
New Revision: 1179922
URL: http://svn.apache.org/viewvc?rev=1179922&view=rev
Log:
SOLR-2816: start versioning
Added:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java (with props)
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java (with props)
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java (with props)
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/Hash.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1179922&r1=1179921&r2=1179922&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java Fri Oct 7 02:04:04 2011
@@ -42,10 +42,7 @@ import org.apache.solr.search.ValueSourc
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.UpdateHandler;
-import org.apache.solr.update.processor.LogUpdateProcessorFactory;
-import org.apache.solr.update.processor.RunUpdateProcessorFactory;
-import org.apache.solr.update.processor.UpdateRequestProcessorChain;
-import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
+import org.apache.solr.update.processor.*;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.plugin.NamedListInitializedPlugin;
import org.apache.solr.util.plugin.SolrCoreAware;
@@ -661,6 +658,7 @@ public final class SolrCore implements S
// construct the default chain
UpdateRequestProcessorFactory[] factories = new UpdateRequestProcessorFactory[]{
new LogUpdateProcessorFactory(),
+ new VersionProcessorFactory(),
new RunUpdateProcessorFactory()
};
def = new UpdateRequestProcessorChain(factories, this);
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java?rev=1179922&r1=1179921&r2=1179922&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java Fri Oct 7 02:04:04 2011
@@ -17,6 +17,7 @@
package org.apache.solr.update;
+import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@@ -26,6 +27,7 @@ import org.apache.solr.common.util.JavaB
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.schema.SchemaField;
import java.io.*;
import java.nio.ByteBuffer;
@@ -33,6 +35,7 @@ import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/** @lucene.experimental */
class NullUpdateLog extends UpdateLog {
@@ -80,6 +83,11 @@ class NullUpdateLog extends UpdateLog {
@Override
public void close() {
}
+
+ @Override
+ public VersionInfo getVersionInfo() {
+ return null;
+ }
}
/** @lucene.experimental */
@@ -105,6 +113,12 @@ public class FSUpdateLog extends UpdateL
private String dataDir;
private String lastDataDir;
+ private VersionInfo versionInfo;
+ @Override
+ public VersionInfo getVersionInfo() {
+ return versionInfo;
+ }
+
@Override
public void init(PluginInfo info) {
dataDir = (String)info.initArgs.get("dir");
@@ -124,6 +138,8 @@ public class FSUpdateLog extends UpdateL
tlogDir.mkdirs();
tlogFiles = getLogList(tlogDir);
id = getLastLogId() + 1; // add 1 since we will create a new log for the next update
+
+ versionInfo = new VersionInfo(uhandler, 64);
}
static class LogPtr {
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1179922&r1=1179921&r2=1179922&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Fri Oct 7 02:04:04 2011
@@ -27,6 +27,7 @@ import org.apache.solr.request.SolrQuery
public class UpdateCommand {
protected final SolrQueryRequest req;
protected final String commandName;
+ protected long version;
public UpdateCommand(String commandName, SolrQueryRequest req) {
this.req = req;
@@ -37,4 +38,11 @@ import org.apache.solr.request.SolrQuery
public String toString() {
return commandName;
}
+
+ public long getVersion() {
+ return version;
+ }
+ public void setVersion(long version) {
+ this.version = version;
+ }
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1179922&r1=1179921&r2=1179922&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Fri Oct 7 02:04:04 2011
@@ -37,4 +37,5 @@ public abstract class UpdateLog implemen
public abstract void postSoftCommit(CommitUpdateCommand cmd);
public abstract Object lookup(BytesRef indexedId);
public abstract void close();
+ public abstract VersionInfo getVersionInfo();
}
Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java?rev=1179922&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java Fri Oct 7 02:04:04 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+// TODO: make inner?
+// TODO: store the highest possible in the index on a commit (but how to not block adds?)
+// TODO: could also store highest possible in the transaction log after a commit.
+// Or on a new index, just scan "version" for the max?
+public class VersionBucket {
+ long highest;
+}
Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java?rev=1179922&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java Fri Oct 7 02:04:04 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import org.apache.lucene.util.BitUtil;
+import org.apache.solr.schema.SchemaField;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class VersionInfo {
+ public static final String VERSION_FIELD="_version_";
+
+ private final VersionBucket[] buckets;
+ private SchemaField versionField;
+
+ public VersionInfo(UpdateHandler updateHandler, int nBuckets) {
+ versionField = updateHandler.core.getSchema().getFieldOrNull("_version_");
+ buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
+ for (int i=0; i<buckets.length; i++) {
+ buckets[i] = new VersionBucket();
+ }
+ }
+
+ public SchemaField getVersionField() {
+ return versionField;
+ }
+
+
+ // todo: initialize... use current time to start?
+ // a clock that increments by 1 for every operation makes it easier to detect missing
+ // messages, but raises other issues:
+ // - need to initialize to largest thing in index or tlog
+ // - when becoming leader, need to make sure it's greater than
+ // - using to detect missing messages means we need to keep track per-leader, or make
+ // sure a new leader starts off with 1 greater than the last leader.
+ private final AtomicLong clock = new AtomicLong();
+
+ public long getNewClock() {
+ return clock.incrementAndGet();
+ }
+
+ // Named *old* to prevent accidental calling getClock and expecting a new updated clock.
+ public long getOldClock() {
+ return clock.get();
+ }
+
+ /***
+ // Time-based lamport clock. Good for introducing some reality into clocks (to the degree
+ // that times are somewhat synchronized in the cluster).
+ // Good if we want to relax some constraints to scale down to where only one node may be
+ // up at a time. Harder to detect missing messages (because versions are not contiguous.
+ long vclock;
+ long time;
+ private final Object clockSync = new Object();
+
+
+ public long getNewClock() {
+ synchronized (clockSync) {
+ time = System.currentTimeMillis();
+ long result = time << 20;
+ if (result <= vclock) {
+ result = vclock + 1;
+ }
+ vclock = result;
+ return vclock;
+ }
+ }
+
+ public long getOldClock() {
+ synchronized (clockSync) {
+ return vclock;
+ }
+ }
+
+ public void updateClock(long clock) {
+ synchronized (clockSync) {
+ vclock = Math.max(vclock, clock);
+ }
+ }
+ ***/
+
+ public VersionBucket bucket(int hash) {
+ // If this is a user provided hash, it may be poor in the right-hand bits.
+ // Make sure high bits are moved down, since only the low bits will matter.
+ // int h = hash + (hash >>> 8) + (hash >>> 16) + (hash >>> 24);
+ // Assume good hash codes for now.
+
+ int slot = hash & (buckets.length-1);
+ return buckets[slot];
+ }
+
+}
Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java?rev=1179922&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java Fri Oct 7 02:04:04 2011
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.util.Hash;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.*;
+
+
+/**
+ * Pass the command to the UpdateHandler without any modifications
+ *
+ * @since solr 1.3
+ */
+public class VersionProcessorFactory extends UpdateRequestProcessorFactory
+{
+ @Override
+ public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next)
+ {
+ // TODO: return null if there is not a unique id defined?
+ return new VersionProcessor(req, next);
+ }
+}
+// this is a separate class from DistribUpdateProcessor only to facilitate
+// working on that in parallel. Given the dependencies, it will most likely make more sense for it to be merged.
+// For example, if not leader, forward to leader. if leader, determine version, then send to replicas
+class VersionProcessor extends UpdateRequestProcessor
+{
+ private final SolrQueryRequest req;
+ private final UpdateHandler updateHandler;
+ private final UpdateLog ulog;
+ private final VersionInfo vinfo;
+ private final boolean versionsStored;
+
+ public VersionProcessor(SolrQueryRequest req, UpdateRequestProcessor next) {
+ super( next );
+ this.req = req;
+ this.updateHandler = req.getCore().getUpdateHandler();
+ this.ulog = updateHandler.getUpdateLog();
+ this.vinfo = ulog.getVersionInfo();
+ versionsStored = this.vinfo != null && this.vinfo.getVersionField() != null;
+ }
+
+ // TODO: move this to AddUpdateCommand/DeleteUpdateCommand and cache it? And make the hash pluggable of course.
+ // The hash also needs to be pluggable
+ private int hash(AddUpdateCommand cmd) {
+ BytesRef br = cmd.getIndexedId();
+ return Hash.murmurhash3_x86_32(br.bytes, br.offset, br.length, 0);
+ }
+ private int hash(DeleteUpdateCommand cmd) {
+ BytesRef br = cmd.getIndexedId();
+ return Hash.murmurhash3_x86_32(br.bytes, br.offset, br.length, 0);
+ }
+
+ @Override
+ public void processAdd(AddUpdateCommand cmd) throws IOException {
+ if (vinfo == null) {
+ super.processAdd(cmd);
+ return;
+ }
+
+ VersionBucket bucket = vinfo.bucket(hash(cmd));
+ synchronized (bucket) {
+ // we obtain the version when synchronized and then do the add so we can ensure that
+ // if version1 < version2 then version1 is actually added before version2.
+
+ // even if we don't store the version field, synchronizing on the bucket
+ // will enable us to know what version happened first, and thus enable
+ // realtime-get to work reliably.
+ // TODO: if verisons aren't stored, do we need to set on the cmd anyway for some reason?
+ // there may be other reasons in the future for a version on the commands
+ if (versionsStored) {
+ long version = vinfo.getNewClock();
+ cmd.setVersion(version);
+ cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
+ }
+
+ super.processAdd(cmd);
+ }
+ }
+
+ @Override
+ public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+ if (vinfo == null) {
+ super.processDelete(cmd);
+ return;
+ }
+
+ if (cmd.id == null) {
+ // delete-by-query
+ // TODO: forward to all nodes in distrib mode? or just don't bother to support?
+ super.processDelete(cmd);
+ return;
+ }
+
+ VersionBucket bucket = vinfo.bucket(hash(cmd));
+ synchronized (bucket) {
+ if (versionsStored) {
+ long version = vinfo.getNewClock();
+ cmd.setVersion(version);
+ }
+ super.processDelete(cmd);
+ }
+ }
+
+ @Override
+ public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
+ super.processMergeIndexes(cmd);
+ }
+
+ @Override
+ public void processCommit(CommitUpdateCommand cmd) throws IOException
+ {
+ super.processCommit(cmd);
+ }
+
+ /**
+ * @since Solr 1.4
+ */
+ @Override
+ public void processRollback(RollbackUpdateCommand cmd) throws IOException
+ {
+ super.processRollback(cmd);
+ }
+}
+
+
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/Hash.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/Hash.java?rev=1179922&r1=1179921&r2=1179922&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/Hash.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/Hash.java Fri Oct 7 02:04:04 2011
@@ -239,4 +239,59 @@ public class Hash {
return c + (((long)b) << 32);
}
+
+ /** Returns the MurmurHash3_x86_32 hash.
+ * Original source/tests at https://github.com/yonik/java_util/
+ */
+ public static int murmurhash3_x86_32(byte[] data, int offset, int len, int seed) {
+
+ final int c1 = 0xcc9e2d51;
+ final int c2 = 0x1b873593;
+
+ int h1 = seed;
+ int roundedEnd = offset + (len & 0xfffffffc); // round down to 4 byte block
+
+ for (int i=offset; i<roundedEnd; i+=4) {
+ // little endian load order
+ int k1 = (data[i] & 0xff) | ((data[i+1] & 0xff) << 8) | ((data[i+2] & 0xff) << 16) | (data[i+3] << 24);
+ k1 *= c1;
+ k1 = (k1 << 15) | (k1 >>> 17); // ROTL32(k1,15);
+ k1 *= c2;
+
+ h1 ^= k1;
+ h1 = (h1 << 13) | (h1 >>> 19); // ROTL32(h1,13);
+ h1 = h1*5+0xe6546b64;
+ }
+
+ // tail
+ int k1 = 0;
+
+ switch(len & 0x03) {
+ case 3:
+ k1 = (data[roundedEnd + 2] & 0xff) << 16;
+ // fallthrough
+ case 2:
+ k1 |= (data[roundedEnd + 1] & 0xff) << 8;
+ // fallthrough
+ case 1:
+ k1 |= (data[roundedEnd] & 0xff);
+ k1 *= c1;
+ k1 = (k1 << 15) | (k1 >>> 17); // ROTL32(k1,15);
+ k1 *= c2;
+ h1 ^= k1;
+ }
+
+ // finalization
+ h1 ^= len;
+
+ // fmix(h1);
+ h1 ^= h1 >>> 16;
+ h1 *= 0x85ebca6b;
+ h1 ^= h1 >>> 13;
+ h1 *= 0xc2b2ae35;
+ h1 ^= h1 >>> 16;
+
+ return h1;
+ }
+
}