You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/24 21:49:21 UTC
[2/6] ACCUMULO-1000 added conditional mutations to Accumulo
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java b/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java
index ce84054..6f6a9ee 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import org.apache.accumulo.core.data.ByteSequence;
import org.apache.hadoop.io.Text;
public class ByteBufferUtil {
@@ -76,4 +77,16 @@ public class ByteBufferUtil {
public static String toString(ByteBuffer bytes) {
return new String(bytes.array(), bytes.position(), bytes.remaining());
}
+
+ public static ByteBuffer toByteBuffers(ByteSequence bs) {
+ if (bs == null)
+ return null;
+
+ if (bs.isBackedByArray()) {
+ return ByteBuffer.wrap(bs.getBackingArray(), bs.offset(), bs.length());
+ } else {
+ // TODO create more efficient impl
+ return ByteBuffer.wrap(bs.toArray());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java b/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java
new file mode 100644
index 0000000..b640581
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ *
+ */
+public class UnsynchronizedBuffer {
+ // created this little class instead of using ByteArrayOutput stream and DataOutputStream
+ // because both are synchronized... lots of small syncs slow things down
+ public static class Writer {
+
+ int offset = 0;
+ byte data[];
+
+ public Writer() {
+ data = new byte[64];
+ }
+
+ public Writer(int initialCapacity) {
+ data = new byte[initialCapacity];
+ }
+
+ private void reserve(int l) {
+ if (offset + l > data.length) {
+ int newSize = UnsynchronizedBuffer.nextArraySize(offset + l);
+
+ byte[] newData = new byte[newSize];
+ System.arraycopy(data, 0, newData, 0, offset);
+ data = newData;
+ }
+
+ }
+
+ public void add(byte[] bytes, int off, int length) {
+ reserve(length);
+ System.arraycopy(bytes, off, data, offset, length);
+ offset += length;
+ }
+
+ public void add(boolean b) {
+ reserve(1);
+ if (b)
+ data[offset++] = 1;
+ else
+ data[offset++] = 0;
+ }
+
+ public byte[] toArray() {
+ byte ret[] = new byte[offset];
+ System.arraycopy(data, 0, ret, 0, offset);
+ return ret;
+ }
+
+ public ByteBuffer toByteBuffer() {
+ return ByteBuffer.wrap(data, 0, offset);
+ }
+
+ public void writeVInt(int i) {
+ writeVLong(i);
+ }
+
+ public void writeVLong(long i) {
+ reserve(9);
+ if (i >= -112 && i <= 127) {
+ data[offset++] = (byte) i;
+ return;
+ }
+
+ int len = -112;
+ if (i < 0) {
+ i ^= -1L; // take one's complement'
+ len = -120;
+ }
+
+ long tmp = i;
+ while (tmp != 0) {
+ tmp = tmp >> 8;
+ len--;
+ }
+
+ data[offset++] = (byte) len;
+
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+
+ for (int idx = len; idx != 0; idx--) {
+ int shiftbits = (idx - 1) * 8;
+ long mask = 0xFFL << shiftbits;
+ data[offset++] = (byte) ((i & mask) >> shiftbits);
+ }
+ }
+ }
+
+ public static class Reader {
+ int offset;
+ byte data[];
+
+ public Reader(byte b[]) {
+ this.data = b;
+ }
+
+ public Reader(ByteBuffer buffer) {
+ if (buffer.hasArray()) {
+ offset = buffer.arrayOffset();
+ data = buffer.array();
+ } else {
+ data = new byte[buffer.remaining()];
+ buffer.get(data);
+ }
+ }
+
+ public int readInt() {
+ return (data[offset++] << 24) + ((data[offset++] & 255) << 16) + ((data[offset++] & 255) << 8) + ((data[offset++] & 255) << 0);
+ }
+
+ public long readLong() {
+ return (((long) data[offset++] << 56) + ((long) (data[offset++] & 255) << 48) + ((long) (data[offset++] & 255) << 40)
+ + ((long) (data[offset++] & 255) << 32) + ((long) (data[offset++] & 255) << 24) + ((data[offset++] & 255) << 16) + ((data[offset++] & 255) << 8) + ((data[offset++] & 255) << 0));
+ }
+
+ public void readBytes(byte b[]) {
+ System.arraycopy(data, offset, b, 0, b.length);
+ offset += b.length;
+ }
+
+ public boolean readBoolean() {
+ return (data[offset++] == 1);
+ }
+
+ public int readVInt() {
+ return (int) readVLong();
+ }
+
+ public long readVLong() {
+ byte firstByte = data[offset++];
+ int len = WritableUtils.decodeVIntSize(firstByte);
+ if (len == 1) {
+ return firstByte;
+ }
+ long i = 0;
+ for (int idx = 0; idx < len - 1; idx++) {
+ byte b = data[offset++];
+ i = i << 8;
+ i = i | (b & 0xFF);
+ }
+ return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+ }
+ }
+
+ /**
+ * Determines what next array size should be by rounding up to next power of two.
+ *
+ */
+ public static int nextArraySize(int i) {
+ if (i < 0)
+ throw new IllegalArgumentException();
+
+ if (i > (1 << 30))
+ return Integer.MAX_VALUE; // this is the next power of 2 minus one... a special case
+
+ if (i == 0) {
+ return 1;
+ }
+
+ // round up to next power of two
+ int ret = i;
+ ret--;
+ ret |= ret >> 1;
+ ret |= ret >> 2;
+ ret |= ret >> 4;
+ ret |= ret >> 8;
+ ret |= ret >> 16;
+ ret++;
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/thrift/data.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/data.thrift b/core/src/main/thrift/data.thrift
index 4e2d3e6..ae6f439 100644
--- a/core/src/main/thrift/data.thrift
+++ b/core/src/main/thrift/data.thrift
@@ -110,10 +110,46 @@ struct UpdateErrors {
3:map<TKeyExtent, client.SecurityErrorCode> authorizationFailures
}
+enum TCMStatus {
+ ACCEPTED,
+ REJECTED,
+ VIOLATED,
+ IGNORED
+}
+
+struct TCMResult {
+ 1:i64 cmid,
+ 2:TCMStatus status
+}
+
struct MapFileInfo {
1:i64 estimatedSize
}
+struct TCondition {
+ 1:binary cf;
+ 2:binary cq;
+ 3:binary cv;
+ 4:i64 ts;
+ 5:bool hasTimestamp;
+ 6:binary val;
+ 7:binary iterators;
+}
+
+struct TConditionalMutation {
+ 1:list<TCondition> conditions;
+ 2:TMutation mutation;
+ 3:i64 id;
+}
+
+struct TConditionalSession {
+ 1:i64 sessionId;
+ 2:string tserverLock;
+ 3:i64 ttl;
+}
+
+typedef map<TKeyExtent,list<TConditionalMutation>> CMBatch
+
typedef map<TKeyExtent,list<TMutation>> UpdateBatch
typedef map<TKeyExtent, map<string, MapFileInfo>> TabletFiles
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index e6adbf4..4f9f13a 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -160,13 +160,21 @@ service TabletClientService extends client.ClientService {
data.UpdateID startUpdate(2:trace.TInfo tinfo, 1:security.TCredentials credentials) throws (1:client.ThriftSecurityException sec),
oneway void applyUpdates(1:trace.TInfo tinfo, 2:data.UpdateID updateID, 3:data.TKeyExtent keyExtent, 4:list<data.TMutation> mutations),
data.UpdateErrors closeUpdate(2:trace.TInfo tinfo, 1:data.UpdateID updateID) throws (1:NoSuchScanIDException nssi),
-
+
//the following call supports making a single update to a tablet
void update(4:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:data.TKeyExtent keyExtent, 3:data.TMutation mutation)
throws (1:client.ThriftSecurityException sec,
2:NotServingTabletException nste,
3:ConstraintViolationException cve),
+
+ data.TConditionalSession startConditionalUpdate(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<binary> authorizations, 4:string tableID)
+ throws (1:client.ThriftSecurityException sec);
+ list<data.TCMResult> conditionalUpdate(1:trace.TInfo tinfo, 2:data.UpdateID sessID, 3:data.CMBatch mutations, 4:list<string> symbols)
+ throws (1:NoSuchScanIDException nssi);
+ void invalidateConditionalUpdate(1:trace.TInfo tinfo, 2:data.UpdateID sessID);
+ oneway void closeConditionalUpdate(1:trace.TInfo tinfo, 2:data.UpdateID sessID);
+
// on success, returns an empty list
list<data.TKeyExtent> bulkImport(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 4:i64 tid, 2:data.TabletFiles files, 5:bool setTime) throws (1:client.ThriftSecurityException sec),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
index 0a34575..fe2f09c 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
@@ -219,7 +219,7 @@ public class TabletLocatorImplTest extends TestCase {
}
private void runTest(TabletLocatorImpl metaCache, List<Mutation> ml, Map<String,Map<KeyExtent,List<String>>> emb, String... efailures) throws Exception {
- Map<String,TabletServerMutations> binnedMutations = new HashMap<String,TabletServerMutations>();
+ Map<String,TabletServerMutations<Mutation>> binnedMutations = new HashMap<String,TabletServerMutations<Mutation>>();
List<Mutation> afailures = new ArrayList<Mutation>();
metaCache.binMutations(ml, binnedMutations, afailures, credential);
@@ -239,11 +239,11 @@ public class TabletLocatorImplTest extends TestCase {
}
- private void verify(Map<String,Map<KeyExtent,List<String>>> expected, Map<String,TabletServerMutations> actual) {
+ private void verify(Map<String,Map<KeyExtent,List<String>>> expected, Map<String,TabletServerMutations<Mutation>> actual) {
assertEquals(expected.keySet(), actual.keySet());
for (String server : actual.keySet()) {
- TabletServerMutations atb = actual.get(server);
+ TabletServerMutations<Mutation> atb = actual.get(server);
Map<KeyExtent,List<String>> etb = expected.get(server);
assertEquals(etb.keySet(), atb.getMutations().keySet());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
index bf577c1..1608576 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
+import org.apache.accumulo.core.util.UnsynchronizedBuffer;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -39,25 +40,25 @@ public class RelativeKeyTest {
@Test
public void testBasicRelativeKey() {
- assertEquals(1, RelativeKey.nextArraySize(0));
- assertEquals(1, RelativeKey.nextArraySize(1));
- assertEquals(2, RelativeKey.nextArraySize(2));
- assertEquals(4, RelativeKey.nextArraySize(3));
- assertEquals(4, RelativeKey.nextArraySize(4));
- assertEquals(8, RelativeKey.nextArraySize(5));
- assertEquals(8, RelativeKey.nextArraySize(8));
- assertEquals(16, RelativeKey.nextArraySize(9));
+ assertEquals(1, UnsynchronizedBuffer.nextArraySize(0));
+ assertEquals(1, UnsynchronizedBuffer.nextArraySize(1));
+ assertEquals(2, UnsynchronizedBuffer.nextArraySize(2));
+ assertEquals(4, UnsynchronizedBuffer.nextArraySize(3));
+ assertEquals(4, UnsynchronizedBuffer.nextArraySize(4));
+ assertEquals(8, UnsynchronizedBuffer.nextArraySize(5));
+ assertEquals(8, UnsynchronizedBuffer.nextArraySize(8));
+ assertEquals(16, UnsynchronizedBuffer.nextArraySize(9));
- assertEquals(1 << 16, RelativeKey.nextArraySize((1 << 16) - 1));
- assertEquals(1 << 16, RelativeKey.nextArraySize(1 << 16));
- assertEquals(1 << 17, RelativeKey.nextArraySize((1 << 16) + 1));
+ assertEquals(1 << 16, UnsynchronizedBuffer.nextArraySize((1 << 16) - 1));
+ assertEquals(1 << 16, UnsynchronizedBuffer.nextArraySize(1 << 16));
+ assertEquals(1 << 17, UnsynchronizedBuffer.nextArraySize((1 << 16) + 1));
- assertEquals(1 << 30, RelativeKey.nextArraySize((1 << 30) - 1));
+ assertEquals(1 << 30, UnsynchronizedBuffer.nextArraySize((1 << 30) - 1));
- assertEquals(1 << 30, RelativeKey.nextArraySize(1 << 30));
+ assertEquals(1 << 30, UnsynchronizedBuffer.nextArraySize(1 << 30));
- assertEquals(Integer.MAX_VALUE, RelativeKey.nextArraySize(Integer.MAX_VALUE - 1));
- assertEquals(Integer.MAX_VALUE, RelativeKey.nextArraySize(Integer.MAX_VALUE));
+ assertEquals(Integer.MAX_VALUE, UnsynchronizedBuffer.nextArraySize(Integer.MAX_VALUE - 1));
+ assertEquals(Integer.MAX_VALUE, UnsynchronizedBuffer.nextArraySize(Integer.MAX_VALUE));
}
@Test
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java b/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java
new file mode 100644
index 0000000..7487ba3
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.data;
+
+import java.util.List;
+
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.data.thrift.TCondition;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
+
+/**
+ *
+ */
+public class ServerConditionalMutation extends ServerMutation {
+
+ public static class TCMTranslator extends Translator<TConditionalMutation,ServerConditionalMutation> {
+ @Override
+ public ServerConditionalMutation translate(TConditionalMutation input) {
+ return new ServerConditionalMutation(input);
+ }
+ }
+
+ public static final TCMTranslator TCMT = new TCMTranslator();
+
+ private long cmid;
+ private List<TCondition> conditions;
+
+ public ServerConditionalMutation(TConditionalMutation input) {
+ super(input.mutation);
+
+ this.cmid = input.id;
+ this.conditions = input.conditions;
+ }
+
+ public long getID() {
+ return cmid;
+ }
+
+ public List<TCondition> getConditions() {
+ return conditions;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index 2b98331..cebc338 100644
--- a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -328,6 +328,13 @@ public class SecurityOperation {
return hasTablePermission(credentials, table, TablePermission.WRITE, true);
}
+ public boolean canConditionallyUpdate(TCredentials credentials, String tableID, List<ByteBuffer> authorizations) throws ThriftSecurityException {
+
+ authenticate(credentials);
+
+ return hasTablePermission(credentials, tableID, TablePermission.WRITE, true) && hasTablePermission(credentials, tableID, TablePermission.READ, true);
+ }
+
public boolean canSplitTablet(TCredentials credentials, String table) throws ThriftSecurityException {
authenticate(credentials);
return hasSystemPermission(credentials, SystemPermission.ALTER_TABLE, false) || hasSystemPermission(credentials, SystemPermission.SYSTEM, false)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
new file mode 100644
index 0000000..c25e729
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.data.ServerConditionalMutation;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ *
+ */
+public class ConditionalMutationSet {
+
+ static interface DeferFilter {
+ void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred);
+ }
+
+ static class DuplicateFitler implements DeferFilter {
+ public void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred) {
+ okMutations.add(scml.get(0));
+ for (int i = 1; i < scml.size(); i++) {
+ if (Arrays.equals(scml.get(i - 1).getRow(), scml.get(i).getRow())) {
+ deferred.add(scml.get(i));
+ } else {
+ okMutations.add(scml.get(i));
+ }
+ }
+ }
+ }
+
+ static void defer(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferredMutations, DeferFilter filter) {
+ for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
+ List<ServerConditionalMutation> scml = entry.getValue();
+ List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(scml.size());
+ List<ServerConditionalMutation> deferred = new ArrayList<ServerConditionalMutation>();
+ filter.defer(scml, okMutations, deferred);
+
+ if (deferred.size() > 0) {
+ scml.clear();
+ scml.addAll(okMutations);
+ List<ServerConditionalMutation> l = deferredMutations.get(entry.getKey());
+ if (l == null) {
+ l = deferred;
+ deferredMutations.put(entry.getKey(), l);
+ } else {
+ l.addAll(deferred);
+ }
+
+ }
+ }
+ }
+
+ static void deferDuplicatesRows(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
+ defer(updates, deferred, new DuplicateFitler());
+ }
+
+ static void sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates) {
+ for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
+ // TODO check if its already in sorted order?
+ // TODO maybe the potential benefit of sorting is not worth the cost
+ Collections.sort(entry.getValue(), new Comparator<ServerConditionalMutation>() {
+ @Override
+ public int compare(ServerConditionalMutation o1, ServerConditionalMutation o2) {
+ return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length);
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java
new file mode 100644
index 0000000..f057ca3
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.data.ServerConditionalMutation;
+import org.apache.accumulo.server.tabletserver.ConditionalMutationSet.DeferFilter;
+
+/**
+ *
+ */
+class RowLocks {
+
+ private Map<ByteSequence,RowLock> rowLocks = new HashMap<ByteSequence,RowLock>();
+
+ static class RowLock {
+ ReentrantLock rlock;
+ int count;
+ ByteSequence rowSeq;
+
+ RowLock(ReentrantLock rlock, ByteSequence rowSeq) {
+ this.rlock = rlock;
+ this.count = 0;
+ this.rowSeq = rowSeq;
+ }
+
+ public boolean tryLock() {
+ return rlock.tryLock();
+ }
+
+ public void lock() {
+ rlock.lock();
+ }
+
+ public void unlock() {
+ rlock.unlock();
+ }
+ }
+
+ private RowLock getRowLock(ArrayByteSequence rowSeq) {
+ RowLock lock = rowLocks.get(rowSeq);
+ if (lock == null) {
+ lock = new RowLock(new ReentrantLock(), rowSeq);
+ rowLocks.put(rowSeq, lock);
+ }
+
+ lock.count++;
+ return lock;
+ }
+
+ private void returnRowLock(RowLock lock) {
+ if (lock.count == 0)
+ throw new IllegalStateException();
+ lock.count--;
+
+ if (lock.count == 0) {
+ rowLocks.remove(lock.rowSeq);
+ }
+ }
+
+ List<RowLock> acquireRowlocks(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
+ ArrayList<RowLock> locks = new ArrayList<RowLock>();
+
+ // assume that mutations are in sorted order to avoid deadlock
+ synchronized (rowLocks) {
+ for (List<ServerConditionalMutation> scml : updates.values()) {
+ for (ServerConditionalMutation scm : scml) {
+ locks.add(getRowLock(new ArrayByteSequence(scm.getRow())));
+ }
+ }
+ }
+
+ HashSet<ByteSequence> rowsNotLocked = null;
+
+ // acquire as many locks as possible, not blocking on rows that are already locked
+ if (locks.size() > 1) {
+ for (RowLock rowLock : locks) {
+ if (!rowLock.tryLock()) {
+ if (rowsNotLocked == null)
+ rowsNotLocked = new HashSet<ByteSequence>();
+ rowsNotLocked.add(rowLock.rowSeq);
+ }
+ }
+ } else {
+ // if there is only one lock, then wait for it
+ locks.get(0).lock();
+ }
+
+ if (rowsNotLocked != null) {
+
+ final HashSet<ByteSequence> rnlf = rowsNotLocked;
+ // assume will get locks needed, do something expensive otherwise
+ ConditionalMutationSet.defer(updates, deferred, new DeferFilter() {
+ @Override
+ public void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred) {
+ for (ServerConditionalMutation scm : scml) {
+ if (rnlf.contains(new ArrayByteSequence(scm.getRow())))
+ deferred.add(scm);
+ else
+ okMutations.add(scm);
+
+ }
+ }
+ });
+
+ ArrayList<RowLock> filteredLocks = new ArrayList<RowLock>();
+ ArrayList<RowLock> locksToReturn = new ArrayList<RowLock>();
+ for (RowLock rowLock : locks) {
+ if (rowsNotLocked.contains(rowLock.rowSeq)) {
+ locksToReturn.add(rowLock);
+ } else {
+ filteredLocks.add(rowLock);
+ }
+ }
+
+ synchronized (rowLocks) {
+ for (RowLock rowLock : locksToReturn) {
+ returnRowLock(rowLock);
+ }
+ }
+
+ locks = filteredLocks;
+ }
+ return locks;
+ }
+
+ void releaseRowLocks(List<RowLock> locks) {
+ for (RowLock rowLock : locks) {
+ rowLock.unlock();
+ }
+
+ synchronized (rowLocks) {
+ for (RowLock rowLock : locks) {
+ returnRowLock(rowLock);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index e9b973a..01b0dc2 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -1651,7 +1651,7 @@ public class Tablet {
}
}
- private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, HashSet<Column> columns) throws IOException {
+ private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, Set<Column> columns) throws IOException {
// log.info("In nextBatch..");
@@ -1739,7 +1739,7 @@ public class Tablet {
public long numBytes;
}
- Scanner createScanner(Range range, int num, HashSet<Column> columns, Authorizations authorizations, List<IterInfo> ssiList,
+ Scanner createScanner(Range range, int num, Set<Column> columns, Authorizations authorizations, List<IterInfo> ssiList,
Map<String,Map<String,String>> ssio, boolean isolated, AtomicBoolean interruptFlag) {
// do a test to see if this range falls within the tablet, if it does not
// then clip will throw an exception
@@ -1873,14 +1873,14 @@ public class Tablet {
// scan options
Authorizations authorizations;
byte[] defaultLabels;
- HashSet<Column> columnSet;
+ Set<Column> columnSet;
List<IterInfo> ssiList;
Map<String,Map<String,String>> ssio;
AtomicBoolean interruptFlag;
int num;
boolean isolated;
- ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList,
+ ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set<Column> columnSet, List<IterInfo> ssiList,
Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, boolean isolated) {
this.num = num;
this.authorizations = authorizations;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 7425fed..ccb95fc 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -65,6 +65,8 @@ import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.CompressedIterators;
+import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
import org.apache.accumulo.core.client.impl.ScannerImpl;
import org.apache.accumulo.core.client.impl.TabletType;
import org.apache.accumulo.core.client.impl.Translator;
@@ -87,7 +89,12 @@ import org.apache.accumulo.core.data.thrift.IterInfo;
import org.apache.accumulo.core.data.thrift.MapFileInfo;
import org.apache.accumulo.core.data.thrift.MultiScanResult;
import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TCMResult;
+import org.apache.accumulo.core.data.thrift.TCMStatus;
import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TCondition;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
+import org.apache.accumulo.core.data.thrift.TConditionalSession;
import org.apache.accumulo.core.data.thrift.TKey;
import org.apache.accumulo.core.data.thrift.TKeyExtent;
import org.apache.accumulo.core.data.thrift.TKeyValue;
@@ -140,6 +147,7 @@ import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.data.ServerConditionalMutation;
import org.apache.accumulo.server.data.ServerMutation;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
@@ -158,6 +166,7 @@ import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.security.SystemCredentials;
import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo;
+import org.apache.accumulo.server.tabletserver.RowLocks.RowLock;
import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
import org.apache.accumulo.server.tabletserver.Tablet.KVEntry;
import org.apache.accumulo.server.tabletserver.Tablet.LookupResult;
@@ -336,12 +345,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
SecureRandom random;
Map<Long,Session> sessions;
+ long maxIdle;
SessionManager(AccumuloConfiguration conf) {
random = new SecureRandom();
sessions = new HashMap<Long,Session>();
- final long maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+ maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
Runnable r = new Runnable() {
@Override
@@ -369,6 +379,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
return sid;
}
+ long getMaxIdleTime() {
+ return maxIdle;
+ }
+
/**
* while a session is reserved, it cannot be canceled or removed
*
@@ -387,9 +401,30 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
+ synchronized Session reserveSession(long sessionId, boolean wait) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ while(wait && session.reserved){
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException();
+ }
+ }
+
+ if (session.reserved)
+ throw new IllegalStateException();
+ session.reserved = true;
+ }
+
+ return session;
+
+ }
+
synchronized void unreserveSession(Session session) {
if (!session.reserved)
throw new IllegalStateException();
+ notifyAll();
session.reserved = false;
session.lastAccessTime = System.currentTimeMillis();
}
@@ -399,7 +434,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (session != null)
unreserveSession(session);
}
-
+
synchronized Session getSession(long sessionId) {
Session session = sessions.get(sessionId);
if (session != null)
@@ -408,9 +443,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
Session removeSession(long sessionId) {
+ return removeSession(sessionId, false);
+ }
+
+ Session removeSession(long sessionId, boolean unreserve) {
Session session = null;
synchronized (this) {
session = sessions.remove(sessionId);
+ if(unreserve && session != null)
+ unreserveSession(session);
}
// do clean up out side of lock..
@@ -709,6 +750,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
+ private static class ConditionalSession extends Session {
+ public TCredentials credentials;
+ public Authorizations auths;
+ public String tableId;
+ public AtomicBoolean interruptFlag;
+
+ @Override
+ public void cleanup() {
+ interruptFlag.set(true);
+ }
+ }
+
private static class UpdateSession extends Session {
public Tablet currentTablet;
public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
@@ -858,6 +911,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
WriteTracker writeTracker = new WriteTracker();
+ private RowLocks rowLocks = new RowLocks();
+
ThriftClientHandler() {
super(instance, watcher);
log.debug(ThriftClientHandler.class.getName() + " created");
@@ -1687,6 +1742,250 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
writeTracker.finishWrite(opid);
}
}
+
+ private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession cs,
+ List<String> symbols) throws IOException {
+ Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
+
+ CompressedIterators compressedIters = new CompressedIterators(symbols);
+
+ while (iter.hasNext()) {
+ Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
+ Tablet tablet = onlineTablets.get(entry.getKey());
+
+ if (tablet == null || tablet.isClosed()) {
+ for (ServerConditionalMutation scm : entry.getValue())
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ iter.remove();
+ } else {
+ List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
+
+ for (ServerConditionalMutation scm : entry.getValue()) {
+ if (checkCondition(results, cs, compressedIters, tablet, scm))
+ okMutations.add(scm);
+ }
+
+ entry.setValue(okMutations);
+ }
+
+ }
+ }
+
+ boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters,
+ Tablet tablet, ServerConditionalMutation scm) throws IOException {
+ boolean add = true;
+
+ Set<Column> emptyCols = Collections.emptySet();
+
+ for(TCondition tc : scm.getConditions()){
+
+ Range range;
+ if (tc.hasTimestamp)
+ range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
+ else
+ range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
+
+ IterConfig ic = compressedIters.decompress(tc.iterators);
+
+ Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
+
+ try {
+ ScanBatch batch = scanner.read();
+
+ Value val = null;
+
+ for (KVEntry entry2 : batch.results) {
+ val = entry2.getValue();
+ break;
+ }
+
+ if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
+ add = false;
+ break;
+ }
+
+ } catch (TabletClosedException e) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ add = false;
+ break;
+ } catch (IterationInterruptedException iie) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ add = false;
+ break;
+ } catch (TooManyFilesException tmfe) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ add = false;
+ break;
+ }
+ }
+ return add;
+ }
+
+ private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) {
+ Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
+
+ Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
+
+ boolean sessionCanceled = sess.interruptFlag.get();
+
+ for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
+ Tablet tablet = onlineTablets.get(entry.getKey());
+ if (tablet == null || tablet.isClosed() || sessionCanceled) {
+ for (ServerConditionalMutation scm : entry.getValue())
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ } else {
+ try {
+
+ List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
+ if (mutations.size() > 0) {
+
+ CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations);
+
+ if (cs == null) {
+ for (ServerConditionalMutation scm : entry.getValue())
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ } else {
+ for (ServerConditionalMutation scm : entry.getValue())
+ results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
+ sendables.put(cs, mutations);
+ }
+ }
+ } catch (TConstraintViolationException e) {
+ if (e.getNonViolators().size() > 0) {
+ sendables.put(e.getCommitSession(), e.getNonViolators());
+ for (Mutation m : e.getNonViolators())
+ results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
+ }
+
+ for (Mutation m : e.getViolators())
+ results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED));
+ }
+ }
+ }
+
+ while (true && sendables.size() > 0) {
+ try {
+ logger.logManyTablets(sendables);
+ break;
+ } catch (IOException ex) {
+ log.warn("logging mutations failed, retrying");
+ } catch (FSError ex) { // happens when DFS is localFS
+ log.warn("logging mutations failed, retrying");
+ } catch (Throwable t) {
+ log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
+ throw new RuntimeException(t);
+ }
+ }
+
+ for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
+ CommitSession commitSession = entry.getKey();
+ List<Mutation> mutations = entry.getValue();
+
+ commitSession.commit(mutations);
+ }
+
+ }
+
+ private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs, Map<KeyExtent,List<ServerConditionalMutation>> updates,
+ ArrayList<TCMResult> results, List<String> symbols) throws IOException {
+ // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
+ ConditionalMutationSet.sortConditionalMutations(updates);
+
+ Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<KeyExtent,List<ServerConditionalMutation>>();
+
+ // can not process two mutations for the same row, because one will not see what the other writes
+ ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
+
+ // get as many locks as possible w/o blocking... defer any rows that are locked
+ List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
+ try {
+ checkConditions(updates, results, cs, symbols);
+ writeConditionalMutations(updates, results, cs);
+ } finally {
+ rowLocks.releaseRowLocks(locks);
+ }
+ return deferred;
+ }
+
+ @Override
+ public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID)
+ throws ThriftSecurityException, TException {
+
+ Authorizations userauths = null;
+ if (!security.canConditionallyUpdate(credentials, tableID, authorizations))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ userauths = security.getUserAuthorizations(credentials);
+ for (ByteBuffer auth : authorizations)
+ if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+
+ ConditionalSession cs = new ConditionalSession();
+ cs.auths = new Authorizations(authorizations);
+ cs.credentials = credentials;
+ cs.tableId = tableID;
+ cs.interruptFlag = new AtomicBoolean();
+
+ long sid = sessionManager.createSession(cs, false);
+ return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime());
+ }
+
+ @Override
+ public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
+ throws NoSuchScanIDException, TException {
+
+ ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
+
+ if (cs == null || cs.interruptFlag.get())
+ throw new NoSuchScanIDException();
+
+ Text tid = new Text(cs.tableId);
+ long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
+
+ try{
+ Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
+ new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
+
+ for(KeyExtent ke : updates.keySet())
+ if(!ke.getTableId().equals(tid))
+ throw new IllegalArgumentException("Unexpected table id "+tid+" != "+ke.getTableId());
+
+ ArrayList<TCMResult> results = new ArrayList<TCMResult>();
+
+ Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs, updates, results, symbols);
+
+ while (deferred.size() > 0) {
+ deferred = conditionalUpdate(cs, deferred, results, symbols);
+ }
+
+ return results;
+ } catch (IOException ioe) {
+ throw new TException(ioe);
+ }finally{
+ writeTracker.finishWrite(opid);
+ sessionManager.unreserveSession(sessID);
+ }
+ }
+
+ @Override
+ public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
+ //this method should wait for any running conditional update to complete
+ //after this method returns a conditional update should not be able to start
+
+ ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
+ if (cs != null)
+ cs.interruptFlag.set(true);
+
+ cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
+ if(cs != null)
+ sessionManager.removeSession(sessID, true);
+ }
+
+ @Override
+ public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException {
+ sessionManager.removeSession(sessID, false);
+ }
@Override
public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException,
@@ -2584,6 +2883,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
private DistributedWorkQueue bulkFailedCopyQ;
+ private String lockID;
+
private static final String METRICS_PREFIX = "tserver";
private static ObjectName OBJECT_NAME = null;
@@ -2705,6 +3006,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (tabletServerLock.tryLock(lw, lockContent)) {
log.debug("Obtained tablet server lock " + tabletServerLock.getLockPath());
+ lockID = tabletServerLock.getLockID().serialize(ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/");
return;
}
log.info("Waiting for tablet server lock");
@@ -2735,7 +3037,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
clientAddress = new InetSocketAddress(clientAddress.getHostName(), clientPort);
announceExistence();
-
+
ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
bulkFailedCopyQ = new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZBULK_FAILED_COPYQ);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
index 451a079..205cebc 100644
--- a/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
+++ b/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
@@ -23,8 +23,6 @@ import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
-import org.junit.Assert;
-
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -44,6 +42,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.junit.Assert;
import org.junit.Test;
public class BulkImporterTest {
@@ -67,8 +66,8 @@ public class BulkImporterTest {
}
@Override
- public void binMutations(List<Mutation> mutations, Map<String,TabletServerMutations> binnedMutations, List<Mutation> failures, TCredentials credentials) throws AccumuloException,
- AccumuloSecurityException, TableNotFoundException {
+ public <T extends Mutation> void binMutations(List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures,
+ TCredentials credentials) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
throw new NotImplementedException();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
new file mode 100644
index 0000000..7e7480f
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.data.ConditionalMutation;
+
+
+/**
+ * A writer that will sometimes return unknown. When it returns unknown the condition may or may not have been written.
+ */
+public class FaultyConditionalWriter implements ConditionalWriter {
+
+ private ConditionalWriter cw;
+ private double up;
+ private Random rand;
+ private double wp;
+
+ public FaultyConditionalWriter(ConditionalWriter cw, double unknownProbability, double writeProbability) {
+ this.cw = cw;
+ this.up = unknownProbability;
+ this.wp = writeProbability;
+ this.rand = new Random();
+
+ }
+
+ public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
+ ArrayList<Result> resultList = new ArrayList<Result>();
+ ArrayList<ConditionalMutation> writes = new ArrayList<ConditionalMutation>();
+
+ while (mutations.hasNext()) {
+ ConditionalMutation cm = mutations.next();
+ if (rand.nextDouble() <= up && rand.nextDouble() > wp)
+ resultList.add(new Result(Status.UNKNOWN, cm, null));
+ else
+ writes.add(cm);
+ }
+
+ if (writes.size() > 0) {
+ Iterator<Result> results = cw.write(writes.iterator());
+
+ while (results.hasNext()) {
+ Result result = results.next();
+
+ if (rand.nextDouble() <= up && rand.nextDouble() <= wp)
+ result = new Result(Status.UNKNOWN, result.getMutation(), result.getTabletServer());
+ resultList.add(result);
+ }
+ }
+ return resultList.iterator();
+ }
+
+ public Result write(ConditionalMutation mutation) {
+ return write(Collections.singleton(mutation).iterator()).next();
+ }
+
+ @Override
+ public void close() {
+ cw.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
index e2db273..1c62720 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
@@ -30,6 +30,11 @@ public class BadIterator extends WrappingIterator {
}
@Override
+ public boolean hasTop() {
+ throw new NullPointerException();
+ }
+
+ @Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
index a71b1ad..03eaefb 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
@@ -17,10 +17,13 @@
package org.apache.accumulo.test.functional;
import java.io.IOException;
+import java.util.Collection;
import java.util.Map;
import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -30,13 +33,19 @@ import org.apache.accumulo.core.util.UtilWaitThread;
public class SlowIterator extends WrappingIterator {
static private final String SLEEP_TIME = "sleepTime";
+ static private final String SEEK_SLEEP_TIME = "seekSleepTime";
- long sleepTime;
+ private long sleepTime = 0;
+ private long seekSleepTime = 0;
public static void setSleepTime(IteratorSetting is, long millis) {
is.addOption(SLEEP_TIME, Long.toString(millis));
}
+ public static void setSeekSleepTime(IteratorSetting is, long t) {
+ is.addOption(SEEK_SLEEP_TIME, Long.toString(t));
+ }
+
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
@@ -49,9 +58,20 @@ public class SlowIterator extends WrappingIterator {
}
@Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ UtilWaitThread.sleep(seekSleepTime);
+ super.seek(range, columnFamilies, inclusive);
+ }
+
+ @Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
super.init(source, options, env);
- sleepTime = Long.parseLong(options.get(SLEEP_TIME));
+ if (options.containsKey(SLEEP_TIME))
+ sleepTime = Long.parseLong(options.get(SLEEP_TIME));
+
+ if (options.containsKey(SEEK_SLEEP_TIME))
+ seekSleepTime = Long.parseLong(options.get(SEEK_SLEEP_TIME));
}
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index e33603f..c84fd7f 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -40,7 +40,10 @@ import org.apache.accumulo.core.data.thrift.IterInfo;
import org.apache.accumulo.core.data.thrift.MapFileInfo;
import org.apache.accumulo.core.data.thrift.MultiScanResult;
import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TCMResult;
import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
+import org.apache.accumulo.core.data.thrift.TConditionalSession;
import org.apache.accumulo.core.data.thrift.TConstraintViolationSummary;
import org.apache.accumulo.core.data.thrift.TKeyExtent;
import org.apache.accumulo.core.data.thrift.TMutation;
@@ -50,6 +53,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
@@ -200,6 +204,25 @@ public class NullTserver {
public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
return new ArrayList<ActiveCompaction>();
}
+
+ @Override
+ public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID)
+ throws ThriftSecurityException,
+ TException {
+ return null;
+ }
+
+ @Override
+ public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
+ throws NoSuchScanIDException, TException {
+ return null;
+ }
+
+ @Override
+ public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {}
+
+ @Override
+ public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException {}
}
static class Opts extends Help {