You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/24 21:49:21 UTC

[2/6] ACCUMULO-1000 added conditional mutations to Accumulo

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java b/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java
index ce84054..6f6a9ee 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.hadoop.io.Text;
 
 public class ByteBufferUtil {
@@ -76,4 +77,16 @@ public class ByteBufferUtil {
   public static String toString(ByteBuffer bytes) {
     return new String(bytes.array(), bytes.position(), bytes.remaining());
   }
+  
+  public static ByteBuffer toByteBuffers(ByteSequence bs) {
+    if (bs == null)
+      return null;
+
+    if (bs.isBackedByArray()) {
+      return ByteBuffer.wrap(bs.getBackingArray(), bs.offset(), bs.length());
+    } else {
+      // TODO create more efficient impl
+      return ByteBuffer.wrap(bs.toArray());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java b/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java
new file mode 100644
index 0000000..b640581
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * 
+ */
+public class UnsynchronizedBuffer {
+  // created this little class instead of using ByteArrayOutput stream and DataOutputStream
+  // because both are synchronized... lots of small syncs slow things down
+  public static class Writer {
+    
+    int offset = 0;
+    byte data[];
+    
+    public Writer() {
+      data = new byte[64];
+    }
+    
+    public Writer(int initialCapacity) {
+      data = new byte[initialCapacity];
+    }
+    
+    private void reserve(int l) {
+      if (offset + l > data.length) {
+        int newSize = UnsynchronizedBuffer.nextArraySize(offset + l);
+
+        byte[] newData = new byte[newSize];
+        System.arraycopy(data, 0, newData, 0, offset);
+        data = newData;
+      }
+      
+    }
+    
+    public void add(byte[] bytes, int off, int length) {
+      reserve(length);
+      System.arraycopy(bytes, off, data, offset, length);
+      offset += length;
+    }
+    
+    public void add(boolean b) {
+      reserve(1);
+      if (b)
+        data[offset++] = 1;
+      else
+        data[offset++] = 0;
+    }
+    
+    public byte[] toArray() {
+      byte ret[] = new byte[offset];
+      System.arraycopy(data, 0, ret, 0, offset);
+      return ret;
+    }
+    
+    public ByteBuffer toByteBuffer() {
+      return ByteBuffer.wrap(data, 0, offset);
+    }
+
+    public void writeVInt(int i) {
+      writeVLong(i);
+    }
+
+    public void writeVLong(long i) {
+      reserve(9);
+      if (i >= -112 && i <= 127) {
+        data[offset++] = (byte) i;
+        return;
+      }
+      
+      int len = -112;
+      if (i < 0) {
+        i ^= -1L; // take one's complement'
+        len = -120;
+      }
+      
+      long tmp = i;
+      while (tmp != 0) {
+        tmp = tmp >> 8;
+        len--;
+      }
+      
+      data[offset++] = (byte) len;
+      
+      len = (len < -120) ? -(len + 120) : -(len + 112);
+      
+      for (int idx = len; idx != 0; idx--) {
+        int shiftbits = (idx - 1) * 8;
+        long mask = 0xFFL << shiftbits;
+        data[offset++] = (byte) ((i & mask) >> shiftbits);
+      }
+    }
+  }
+  
+  public static class Reader {
+    int offset;
+    byte data[];
+    
+    public Reader(byte b[]) {
+      this.data = b;
+    }
+    
+    public Reader(ByteBuffer buffer) {
+      if (buffer.hasArray()) {
+        offset = buffer.arrayOffset();
+        data = buffer.array();
+      } else {
+        data = new byte[buffer.remaining()];
+        buffer.get(data);
+      }
+    }
+
+    public int readInt() {
+      return (data[offset++] << 24) + ((data[offset++] & 255) << 16) + ((data[offset++] & 255) << 8) + ((data[offset++] & 255) << 0);
+    }
+    
+    public long readLong() {
+      return (((long) data[offset++] << 56) + ((long) (data[offset++] & 255) << 48) + ((long) (data[offset++] & 255) << 40)
+          + ((long) (data[offset++] & 255) << 32) + ((long) (data[offset++] & 255) << 24) + ((data[offset++] & 255) << 16) + ((data[offset++] & 255) << 8) + ((data[offset++] & 255) << 0));
+    }
+    
+    public void readBytes(byte b[]) {
+      System.arraycopy(data, offset, b, 0, b.length);
+      offset += b.length;
+    }
+    
+    public boolean readBoolean() {
+      return (data[offset++] == 1);
+    }
+    
+    public int readVInt() {
+      return (int) readVLong();
+    }
+
+    public long readVLong() {
+      byte firstByte = data[offset++];
+      int len = WritableUtils.decodeVIntSize(firstByte);
+      if (len == 1) {
+        return firstByte;
+      }
+      long i = 0;
+      for (int idx = 0; idx < len - 1; idx++) {
+        byte b = data[offset++];
+        i = i << 8;
+        i = i | (b & 0xFF);
+      }
+      return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+    }
+  }
+
+  /**
+   * Determines what next array size should be by rounding up to next power of two.
+   * 
+   */
+  public static int nextArraySize(int i) {
+    if (i < 0)
+      throw new IllegalArgumentException();
+    
+    if (i > (1 << 30))
+      return Integer.MAX_VALUE; // this is the next power of 2 minus one... a special case
+  
+    if (i == 0) {
+      return 1;
+    }
+    
+    // round up to next power of two
+    int ret = i;
+    ret--;
+    ret |= ret >> 1;
+    ret |= ret >> 2;
+    ret |= ret >> 4;
+    ret |= ret >> 8;
+    ret |= ret >> 16;
+    ret++;
+    
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/thrift/data.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/data.thrift b/core/src/main/thrift/data.thrift
index 4e2d3e6..ae6f439 100644
--- a/core/src/main/thrift/data.thrift
+++ b/core/src/main/thrift/data.thrift
@@ -110,10 +110,46 @@ struct UpdateErrors {
 	3:map<TKeyExtent, client.SecurityErrorCode> authorizationFailures
 }
 
+enum TCMStatus {
+	ACCEPTED,
+	REJECTED,
+	VIOLATED,
+	IGNORED
+}
+
+struct TCMResult {
+	1:i64 cmid,
+	2:TCMStatus status
+}
+
 struct MapFileInfo {
 	1:i64 estimatedSize
 }
 
+struct TCondition {
+	1:binary cf;
+	2:binary cq;
+	3:binary cv;
+	4:i64 ts;
+	5:bool hasTimestamp;
+	6:binary val;
+	7:binary iterators;
+}
+
+struct TConditionalMutation {
+	1:list<TCondition> conditions;
+	2:TMutation mutation;
+	3:i64 id;
+}
+
+struct TConditionalSession {
+	1:i64 sessionId;
+	2:string tserverLock;
+        3:i64 ttl;
+}
+
+typedef map<TKeyExtent,list<TConditionalMutation>> CMBatch
+
 typedef map<TKeyExtent,list<TMutation>> UpdateBatch
 
 typedef map<TKeyExtent, map<string, MapFileInfo>> TabletFiles

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index e6adbf4..4f9f13a 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -160,13 +160,21 @@ service TabletClientService extends client.ClientService {
   data.UpdateID startUpdate(2:trace.TInfo tinfo, 1:security.TCredentials credentials) throws (1:client.ThriftSecurityException sec),
   oneway void applyUpdates(1:trace.TInfo tinfo, 2:data.UpdateID updateID, 3:data.TKeyExtent keyExtent, 4:list<data.TMutation> mutations),
   data.UpdateErrors closeUpdate(2:trace.TInfo tinfo, 1:data.UpdateID updateID) throws (1:NoSuchScanIDException nssi),
-  
+
   //the following call supports making a single update to a tablet
   void update(4:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:data.TKeyExtent keyExtent, 3:data.TMutation mutation)
     throws (1:client.ThriftSecurityException sec, 
             2:NotServingTabletException nste, 
             3:ConstraintViolationException cve),
+
+  data.TConditionalSession startConditionalUpdate(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<binary> authorizations, 4:string tableID)
+     throws (1:client.ThriftSecurityException sec);
   
+  list<data.TCMResult> conditionalUpdate(1:trace.TInfo tinfo, 2:data.UpdateID sessID, 3:data.CMBatch mutations, 4:list<string> symbols)
+     throws (1:NoSuchScanIDException nssi);
+  void invalidateConditionalUpdate(1:trace.TInfo tinfo, 2:data.UpdateID sessID);
+  oneway void closeConditionalUpdate(1:trace.TInfo tinfo, 2:data.UpdateID sessID);
+
   // on success, returns an empty list
   list<data.TKeyExtent> bulkImport(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 4:i64 tid, 2:data.TabletFiles files, 5:bool setTime) throws (1:client.ThriftSecurityException sec),
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
index 0a34575..fe2f09c 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
@@ -219,7 +219,7 @@ public class TabletLocatorImplTest extends TestCase {
   }
   
   private void runTest(TabletLocatorImpl metaCache, List<Mutation> ml, Map<String,Map<KeyExtent,List<String>>> emb, String... efailures) throws Exception {
-    Map<String,TabletServerMutations> binnedMutations = new HashMap<String,TabletServerMutations>();
+    Map<String,TabletServerMutations<Mutation>> binnedMutations = new HashMap<String,TabletServerMutations<Mutation>>();
     List<Mutation> afailures = new ArrayList<Mutation>();
     metaCache.binMutations(ml, binnedMutations, afailures, credential);
     
@@ -239,11 +239,11 @@ public class TabletLocatorImplTest extends TestCase {
     
   }
   
-  private void verify(Map<String,Map<KeyExtent,List<String>>> expected, Map<String,TabletServerMutations> actual) {
+  private void verify(Map<String,Map<KeyExtent,List<String>>> expected, Map<String,TabletServerMutations<Mutation>> actual) {
     assertEquals(expected.keySet(), actual.keySet());
     
     for (String server : actual.keySet()) {
-      TabletServerMutations atb = actual.get(server);
+      TabletServerMutations<Mutation> atb = actual.get(server);
       Map<KeyExtent,List<String>> etb = expected.get(server);
       
       assertEquals(etb.keySet(), atb.getMutations().keySet());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
index bf577c1..1608576 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
+import org.apache.accumulo.core.util.UnsynchronizedBuffer;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -39,25 +40,25 @@ public class RelativeKeyTest {
   
   @Test
   public void testBasicRelativeKey() {
-    assertEquals(1, RelativeKey.nextArraySize(0));
-    assertEquals(1, RelativeKey.nextArraySize(1));
-    assertEquals(2, RelativeKey.nextArraySize(2));
-    assertEquals(4, RelativeKey.nextArraySize(3));
-    assertEquals(4, RelativeKey.nextArraySize(4));
-    assertEquals(8, RelativeKey.nextArraySize(5));
-    assertEquals(8, RelativeKey.nextArraySize(8));
-    assertEquals(16, RelativeKey.nextArraySize(9));
+    assertEquals(1, UnsynchronizedBuffer.nextArraySize(0));
+    assertEquals(1, UnsynchronizedBuffer.nextArraySize(1));
+    assertEquals(2, UnsynchronizedBuffer.nextArraySize(2));
+    assertEquals(4, UnsynchronizedBuffer.nextArraySize(3));
+    assertEquals(4, UnsynchronizedBuffer.nextArraySize(4));
+    assertEquals(8, UnsynchronizedBuffer.nextArraySize(5));
+    assertEquals(8, UnsynchronizedBuffer.nextArraySize(8));
+    assertEquals(16, UnsynchronizedBuffer.nextArraySize(9));
     
-    assertEquals(1 << 16, RelativeKey.nextArraySize((1 << 16) - 1));
-    assertEquals(1 << 16, RelativeKey.nextArraySize(1 << 16));
-    assertEquals(1 << 17, RelativeKey.nextArraySize((1 << 16) + 1));
+    assertEquals(1 << 16, UnsynchronizedBuffer.nextArraySize((1 << 16) - 1));
+    assertEquals(1 << 16, UnsynchronizedBuffer.nextArraySize(1 << 16));
+    assertEquals(1 << 17, UnsynchronizedBuffer.nextArraySize((1 << 16) + 1));
     
-    assertEquals(1 << 30, RelativeKey.nextArraySize((1 << 30) - 1));
+    assertEquals(1 << 30, UnsynchronizedBuffer.nextArraySize((1 << 30) - 1));
 
-    assertEquals(1 << 30, RelativeKey.nextArraySize(1 << 30));
+    assertEquals(1 << 30, UnsynchronizedBuffer.nextArraySize(1 << 30));
 
-    assertEquals(Integer.MAX_VALUE, RelativeKey.nextArraySize(Integer.MAX_VALUE - 1));
-    assertEquals(Integer.MAX_VALUE, RelativeKey.nextArraySize(Integer.MAX_VALUE));
+    assertEquals(Integer.MAX_VALUE, UnsynchronizedBuffer.nextArraySize(Integer.MAX_VALUE - 1));
+    assertEquals(Integer.MAX_VALUE, UnsynchronizedBuffer.nextArraySize(Integer.MAX_VALUE));
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java b/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java
new file mode 100644
index 0000000..7487ba3
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.data;
+
+import java.util.List;
+
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.data.thrift.TCondition;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
+
+/**
+ * 
+ */
+public class ServerConditionalMutation extends ServerMutation {
+  
+  public static class TCMTranslator extends Translator<TConditionalMutation,ServerConditionalMutation> {
+    @Override
+    public ServerConditionalMutation translate(TConditionalMutation input) {
+      return new ServerConditionalMutation(input);
+    }
+  }
+  
+  public static final TCMTranslator TCMT = new TCMTranslator();
+
+  private long cmid;
+  private List<TCondition> conditions;
+  
+  public ServerConditionalMutation(TConditionalMutation input) {
+    super(input.mutation);
+
+    this.cmid = input.id;
+    this.conditions = input.conditions;
+  }
+
+  public long getID() {
+    return cmid;
+  }
+  
+  public List<TCondition> getConditions() {
+    return conditions;
+  }
+  
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index 2b98331..cebc338 100644
--- a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -328,6 +328,13 @@ public class SecurityOperation {
     return hasTablePermission(credentials, table, TablePermission.WRITE, true);
   }
   
+  public boolean canConditionallyUpdate(TCredentials credentials, String tableID, List<ByteBuffer> authorizations) throws ThriftSecurityException {
+    
+    authenticate(credentials);
+    
+    return hasTablePermission(credentials, tableID, TablePermission.WRITE, true) && hasTablePermission(credentials, tableID, TablePermission.READ, true);
+  }
+
   public boolean canSplitTablet(TCredentials credentials, String table) throws ThriftSecurityException {
     authenticate(credentials);
     return hasSystemPermission(credentials, SystemPermission.ALTER_TABLE, false) || hasSystemPermission(credentials, SystemPermission.SYSTEM, false)

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
new file mode 100644
index 0000000..c25e729
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.data.ServerConditionalMutation;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ * 
+ */
+public class ConditionalMutationSet {
+
+  static interface DeferFilter {
+    void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred);
+  }
+  
+  static class DuplicateFitler implements DeferFilter {
+    public void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred) {
+      okMutations.add(scml.get(0));
+      for (int i = 1; i < scml.size(); i++) {
+        if (Arrays.equals(scml.get(i - 1).getRow(), scml.get(i).getRow())) {
+          deferred.add(scml.get(i));
+        } else {
+          okMutations.add(scml.get(i));
+        }
+      }
+    }
+  }
+  
+  static void defer(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferredMutations, DeferFilter filter) {
+    for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
+      List<ServerConditionalMutation> scml = entry.getValue();
+      List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(scml.size());
+      List<ServerConditionalMutation> deferred = new ArrayList<ServerConditionalMutation>();
+      filter.defer(scml, okMutations, deferred);
+      
+      if (deferred.size() > 0) {
+        scml.clear();
+        scml.addAll(okMutations);
+        List<ServerConditionalMutation> l = deferredMutations.get(entry.getKey());
+        if (l == null) {
+          l = deferred;
+          deferredMutations.put(entry.getKey(), l);
+        } else {
+          l.addAll(deferred);
+        }
+
+      }
+    }
+  }
+  
+  static void deferDuplicatesRows(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
+    defer(updates, deferred, new DuplicateFitler());
+  }
+
+  static void sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates) {
+    for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
+      // TODO check if its already in sorted order?
+      // TODO maybe the potential benefit of sorting is not worth the cost
+      Collections.sort(entry.getValue(), new Comparator<ServerConditionalMutation>() {
+        @Override
+        public int compare(ServerConditionalMutation o1, ServerConditionalMutation o2) {
+          return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length);
+        }
+      });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java
new file mode 100644
index 0000000..f057ca3
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.data.ServerConditionalMutation;
+import org.apache.accumulo.server.tabletserver.ConditionalMutationSet.DeferFilter;
+
+/**
+ * 
+ */
+class RowLocks {
+  
+  private Map<ByteSequence,RowLock> rowLocks = new HashMap<ByteSequence,RowLock>();
+  
+  static class RowLock {
+    ReentrantLock rlock;
+    int count;
+    ByteSequence rowSeq;
+    
+    RowLock(ReentrantLock rlock, ByteSequence rowSeq) {
+      this.rlock = rlock;
+      this.count = 0;
+      this.rowSeq = rowSeq;
+    }
+    
+    public boolean tryLock() {
+      return rlock.tryLock();
+    }
+    
+    public void lock() {
+      rlock.lock();
+    }
+    
+    public void unlock() {
+      rlock.unlock();
+    }
+  }
+  
+  private RowLock getRowLock(ArrayByteSequence rowSeq) {
+      RowLock lock = rowLocks.get(rowSeq);
+      if (lock == null) {
+        lock = new RowLock(new ReentrantLock(), rowSeq);
+        rowLocks.put(rowSeq, lock);
+      }
+      
+      lock.count++;
+      return lock;
+  }
+  
+  private void returnRowLock(RowLock lock) {
+      if (lock.count == 0)
+        throw new IllegalStateException();
+      lock.count--;
+      
+      if (lock.count == 0) {
+        rowLocks.remove(lock.rowSeq);
+      }
+  }
+  
+  List<RowLock> acquireRowlocks(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
+    ArrayList<RowLock> locks = new ArrayList<RowLock>();
+    
+    // assume that mutations are in sorted order to avoid deadlock
+    synchronized (rowLocks) {
+      for (List<ServerConditionalMutation> scml : updates.values()) {
+        for (ServerConditionalMutation scm : scml) {
+          locks.add(getRowLock(new ArrayByteSequence(scm.getRow())));
+        }
+      }
+    }
+    
+    HashSet<ByteSequence> rowsNotLocked = null;
+
+    // acquire as many locks as possible, not blocking on rows that are already locked
+    if (locks.size() > 1) {
+      for (RowLock rowLock : locks) {
+        if (!rowLock.tryLock()) {
+          if (rowsNotLocked == null)
+            rowsNotLocked = new HashSet<ByteSequence>();
+          rowsNotLocked.add(rowLock.rowSeq);
+        }
+      }
+    } else {
+      // if there is only one lock, then wait for it
+      locks.get(0).lock();
+    }
+    
+    if (rowsNotLocked != null) {
+      
+      final HashSet<ByteSequence> rnlf = rowsNotLocked;
+      // assume will get locks needed, do something expensive otherwise
+      ConditionalMutationSet.defer(updates, deferred, new DeferFilter() {
+        @Override
+        public void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred) {
+          for (ServerConditionalMutation scm : scml) {
+            if (rnlf.contains(new ArrayByteSequence(scm.getRow())))
+              deferred.add(scm);
+            else
+              okMutations.add(scm);
+            
+          }
+        }
+      });
+      
+      ArrayList<RowLock> filteredLocks = new ArrayList<RowLock>();
+      ArrayList<RowLock> locksToReturn = new ArrayList<RowLock>();
+      for (RowLock rowLock : locks) {
+        if (rowsNotLocked.contains(rowLock.rowSeq)) {
+          locksToReturn.add(rowLock);
+        } else {
+          filteredLocks.add(rowLock);
+        }
+      }
+      
+      synchronized (rowLocks) {
+        for (RowLock rowLock : locksToReturn) {
+          returnRowLock(rowLock);
+        }
+      }
+
+      locks = filteredLocks;
+    }
+    return locks;
+  }
+  
+  void releaseRowLocks(List<RowLock> locks) {
+    for (RowLock rowLock : locks) {
+      rowLock.unlock();
+    }
+    
+    synchronized (rowLocks) {
+      for (RowLock rowLock : locks) {
+        returnRowLock(rowLock);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index e9b973a..01b0dc2 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -1651,7 +1651,7 @@ public class Tablet {
     }
   }
   
-  private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, HashSet<Column> columns) throws IOException {
+  private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, Set<Column> columns) throws IOException {
     
     // log.info("In nextBatch..");
     
@@ -1739,7 +1739,7 @@ public class Tablet {
     public long numBytes;
   }
   
-  Scanner createScanner(Range range, int num, HashSet<Column> columns, Authorizations authorizations, List<IterInfo> ssiList,
+  Scanner createScanner(Range range, int num, Set<Column> columns, Authorizations authorizations, List<IterInfo> ssiList,
       Map<String,Map<String,String>> ssio, boolean isolated, AtomicBoolean interruptFlag) {
     // do a test to see if this range falls within the tablet, if it does not
     // then clip will throw an exception
@@ -1873,14 +1873,14 @@ public class Tablet {
     // scan options
     Authorizations authorizations;
     byte[] defaultLabels;
-    HashSet<Column> columnSet;
+    Set<Column> columnSet;
     List<IterInfo> ssiList;
     Map<String,Map<String,String>> ssio;
     AtomicBoolean interruptFlag;
     int num;
     boolean isolated;
     
-    ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList,
+    ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set<Column> columnSet, List<IterInfo> ssiList,
         Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, boolean isolated) {
       this.num = num;
       this.authorizations = authorizations;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 7425fed..ccb95fc 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -65,6 +65,8 @@ import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.CompressedIterators;
+import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
 import org.apache.accumulo.core.client.impl.ScannerImpl;
 import org.apache.accumulo.core.client.impl.TabletType;
 import org.apache.accumulo.core.client.impl.Translator;
@@ -87,7 +89,12 @@ import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.data.thrift.MapFileInfo;
 import org.apache.accumulo.core.data.thrift.MultiScanResult;
 import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TCMResult;
+import org.apache.accumulo.core.data.thrift.TCMStatus;
 import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TCondition;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
+import org.apache.accumulo.core.data.thrift.TConditionalSession;
 import org.apache.accumulo.core.data.thrift.TKey;
 import org.apache.accumulo.core.data.thrift.TKeyExtent;
 import org.apache.accumulo.core.data.thrift.TKeyValue;
@@ -140,6 +147,7 @@ import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.data.ServerConditionalMutation;
 import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -158,6 +166,7 @@ import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SecurityOperation;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo;
+import org.apache.accumulo.server.tabletserver.RowLocks.RowLock;
 import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
 import org.apache.accumulo.server.tabletserver.Tablet.KVEntry;
 import org.apache.accumulo.server.tabletserver.Tablet.LookupResult;
@@ -336,12 +345,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     
     SecureRandom random;
     Map<Long,Session> sessions;
+    long maxIdle;
     
     SessionManager(AccumuloConfiguration conf) {
       random = new SecureRandom();
       sessions = new HashMap<Long,Session>();
       
-      final long maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+      maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
       
       Runnable r = new Runnable() {
         @Override
@@ -369,6 +379,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       return sid;
     }
     
+    long getMaxIdleTime() {
+      return maxIdle;
+    }
+
     /**
      * while a session is reserved, it cannot be canceled or removed
      * 
@@ -387,9 +401,30 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       
     }
     
+    synchronized Session reserveSession(long sessionId, boolean wait) {
+      Session session = sessions.get(sessionId);
+      if (session != null) {
+        while(wait && session.reserved){
+          try {
+            wait(1000);
+          } catch (InterruptedException e) {
+            throw new RuntimeException();
+          }
+        }
+        
+        if (session.reserved)
+          throw new IllegalStateException();
+        session.reserved = true;
+      }
+      
+      return session;
+      
+    }
+    
     synchronized void unreserveSession(Session session) {
       if (!session.reserved)
         throw new IllegalStateException();
+      notifyAll();
       session.reserved = false;
       session.lastAccessTime = System.currentTimeMillis();
     }
@@ -399,7 +434,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       if (session != null)
         unreserveSession(session);
     }
-    
+        
     synchronized Session getSession(long sessionId) {
       Session session = sessions.get(sessionId);
       if (session != null)
@@ -408,9 +443,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     }
     
     Session removeSession(long sessionId) {
+      return removeSession(sessionId, false);
+    }
+    
+    Session removeSession(long sessionId, boolean unreserve) {
       Session session = null;
       synchronized (this) {
         session = sessions.remove(sessionId);
+        if(unreserve && session != null)
+          unreserveSession(session);
       }
       
       // do clean up out side of lock..
@@ -709,6 +750,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     
   }
   
+  private static class ConditionalSession extends Session {
+    public TCredentials credentials;
+    public Authorizations auths;
+    public String tableId;
+    public AtomicBoolean interruptFlag;
+    
+    @Override
+    public void cleanup() {
+      interruptFlag.set(true);
+    }
+  }
+  
   private static class UpdateSession extends Session {
     public Tablet currentTablet;
     public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
@@ -858,6 +911,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     
     WriteTracker writeTracker = new WriteTracker();
     
+    private RowLocks rowLocks = new RowLocks();
+
     ThriftClientHandler() {
       super(instance, watcher);
       log.debug(ThriftClientHandler.class.getName() + " created");
@@ -1687,6 +1742,250 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         writeTracker.finishWrite(opid);
       }
     }
+
+    private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession cs,
+        List<String> symbols) throws IOException {
+      Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
+      
+      CompressedIterators compressedIters = new CompressedIterators(symbols);
+
+      while (iter.hasNext()) {
+        Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
+        Tablet tablet = onlineTablets.get(entry.getKey());
+        
+        if (tablet == null || tablet.isClosed()) {
+          for (ServerConditionalMutation scm : entry.getValue())
+            results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          iter.remove();
+        } else {
+          List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
+
+          for (ServerConditionalMutation scm : entry.getValue()) {
+            if (checkCondition(results, cs, compressedIters, tablet, scm))
+              okMutations.add(scm);
+          }
+          
+          entry.setValue(okMutations);
+        }
+        
+      }
+    }
+
+    boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters,
+        Tablet tablet, ServerConditionalMutation scm) throws IOException {
+      boolean add = true;
+      
+      Set<Column> emptyCols = Collections.emptySet();
+
+      for(TCondition tc : scm.getConditions()){
+      
+        Range range;
+        if (tc.hasTimestamp)
+          range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
+        else
+          range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
+        
+        IterConfig ic = compressedIters.decompress(tc.iterators);
+
+        Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
+        
+        try {
+          ScanBatch batch = scanner.read();
+          
+          Value val = null;
+          
+          for (KVEntry entry2 : batch.results) {
+            val = entry2.getValue();
+            break;
+          }
+          
+          if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
+            results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
+            add = false;
+            break;
+          }
+          
+        } catch (TabletClosedException e) {
+          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          add = false;
+          break;
+        } catch (IterationInterruptedException iie) {
+          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          add = false;
+          break;
+        } catch (TooManyFilesException tmfe) {
+          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          add = false;
+          break;
+        }
+      }
+      return add;
+    }
+
+    private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) {
+      Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
+      
+      Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
+
+      boolean sessionCanceled = sess.interruptFlag.get();
+
+      for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
+        Tablet tablet = onlineTablets.get(entry.getKey());
+        if (tablet == null || tablet.isClosed() || sessionCanceled) {
+          for (ServerConditionalMutation scm : entry.getValue())
+            results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+        } else {
+          try {
+            
+            List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
+            if (mutations.size() > 0) {
+
+              CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations);
+              
+              if (cs == null) {
+                for (ServerConditionalMutation scm : entry.getValue())
+                  results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+              } else {
+                for (ServerConditionalMutation scm : entry.getValue())
+                  results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
+                sendables.put(cs, mutations);
+              }
+            }
+          } catch (TConstraintViolationException e) {
+            if (e.getNonViolators().size() > 0) {
+              sendables.put(e.getCommitSession(), e.getNonViolators());
+              for (Mutation m : e.getNonViolators())
+                results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
+            }
+            
+            for (Mutation m : e.getViolators())
+              results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED));
+          }
+        }
+      }
+      
+      while (true && sendables.size() > 0) {
+        try {
+          logger.logManyTablets(sendables);
+          break;
+        } catch (IOException ex) {
+          log.warn("logging mutations failed, retrying");
+        } catch (FSError ex) { // happens when DFS is localFS
+          log.warn("logging mutations failed, retrying");
+        } catch (Throwable t) {
+          log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
+          throw new RuntimeException(t);
+        }
+      }
+      
+      for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
+        CommitSession commitSession = entry.getKey();
+        List<Mutation> mutations = entry.getValue();
+        
+        commitSession.commit(mutations);
+      }
+
+    }
+
+    private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs, Map<KeyExtent,List<ServerConditionalMutation>> updates,
+        ArrayList<TCMResult> results, List<String> symbols) throws IOException {
+      // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
+      ConditionalMutationSet.sortConditionalMutations(updates);
+      
+      Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<KeyExtent,List<ServerConditionalMutation>>();
+
+      // can not process two mutations for the same row, because one will not see what the other writes
+      ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
+
+      // get as many locks as possible w/o blocking... defer any rows that are locked
+      List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
+      try {
+        checkConditions(updates, results, cs, symbols);
+        writeConditionalMutations(updates, results, cs);
+      } finally {
+        rowLocks.releaseRowLocks(locks);
+      }
+      return deferred;
+    }
+    
+    @Override
+    public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID)
+        throws ThriftSecurityException, TException {
+      
+      Authorizations userauths = null;
+      if (!security.canConditionallyUpdate(credentials, tableID, authorizations))
+        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+      
+      userauths = security.getUserAuthorizations(credentials);
+      for (ByteBuffer auth : authorizations)
+        if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
+          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+
+      ConditionalSession cs = new ConditionalSession();
+      cs.auths = new Authorizations(authorizations);
+      cs.credentials = credentials;
+      cs.tableId = tableID;
+      cs.interruptFlag = new AtomicBoolean();
+      
+      long sid = sessionManager.createSession(cs, false);
+      return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime());
+    }
+
+    @Override
+    public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
+        throws NoSuchScanIDException, TException {
+      
+      ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
+      
+      if (cs == null || cs.interruptFlag.get())
+        throw new NoSuchScanIDException();
+      
+      Text tid = new Text(cs.tableId);
+      long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
+      
+      try{
+        Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
+            new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
+
+        for(KeyExtent ke : updates.keySet())
+          if(!ke.getTableId().equals(tid))
+            throw new IllegalArgumentException("Unexpected table id "+tid+" != "+ke.getTableId());
+        
+        ArrayList<TCMResult> results = new ArrayList<TCMResult>();
+        
+        Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs, updates, results, symbols);
+  
+        while (deferred.size() > 0) {
+          deferred = conditionalUpdate(cs, deferred, results, symbols);
+        }
+  
+        return results;
+      } catch (IOException ioe) {
+        throw new TException(ioe);
+      }finally{
+        writeTracker.finishWrite(opid);
+        sessionManager.unreserveSession(sessID);
+      }
+    }
+
+    @Override
+    public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
+      //this method should wait for any running conditional update to complete
+      //after this method returns a conditional update should not be able to start
+      
+      ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
+      if (cs != null)
+        cs.interruptFlag.set(true);
+      
+      cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
+      if(cs != null)
+        sessionManager.removeSession(sessID, true);
+    }
+
+    @Override
+    public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException {
+      sessionManager.removeSession(sessID, false);
+    }
     
     @Override
     public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException,
@@ -2584,6 +2883,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
   
   private DistributedWorkQueue bulkFailedCopyQ;
   
+  private String lockID;
+  
   private static final String METRICS_PREFIX = "tserver";
   
   private static ObjectName OBJECT_NAME = null;
@@ -2705,6 +3006,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         
         if (tabletServerLock.tryLock(lw, lockContent)) {
           log.debug("Obtained tablet server lock " + tabletServerLock.getLockPath());
+          lockID = tabletServerLock.getLockID().serialize(ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/");
           return;
         }
         log.info("Waiting for tablet server lock");
@@ -2735,7 +3037,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     }
     clientAddress = new InetSocketAddress(clientAddress.getHostName(), clientPort);
     announceExistence();
-    
+
     ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
     
     bulkFailedCopyQ = new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZBULK_FAILED_COPYQ);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
index 451a079..205cebc 100644
--- a/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
+++ b/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
@@ -23,8 +23,6 @@ import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
-import org.junit.Assert;
-
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -44,6 +42,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class BulkImporterTest {
@@ -67,8 +66,8 @@ public class BulkImporterTest {
     }
     
     @Override
-    public void binMutations(List<Mutation> mutations, Map<String,TabletServerMutations> binnedMutations, List<Mutation> failures, TCredentials credentials) throws AccumuloException,
-        AccumuloSecurityException, TableNotFoundException {
+    public <T extends Mutation> void binMutations(List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures,
+        TCredentials credentials) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
       throw new NotImplementedException();
     }
     

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
new file mode 100644
index 0000000..7e7480f
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.data.ConditionalMutation;
+
+
+/**
+ * A writer that will sometimes return unknown. When it returns unknown the condition may or may not have been written.
+ */
+public class FaultyConditionalWriter implements ConditionalWriter {
+  
+  private ConditionalWriter cw;
+  private double up;
+  private Random rand;
+  private double wp;
+  
+  public FaultyConditionalWriter(ConditionalWriter cw, double unknownProbability, double writeProbability) {
+    this.cw = cw;
+    this.up = unknownProbability;
+    this.wp = writeProbability;
+    this.rand = new Random();
+
+  }
+
+  public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
+    ArrayList<Result> resultList = new ArrayList<Result>();
+    ArrayList<ConditionalMutation> writes = new ArrayList<ConditionalMutation>();
+    
+    while (mutations.hasNext()) {
+      ConditionalMutation cm = mutations.next();
+      if (rand.nextDouble() <= up && rand.nextDouble() > wp)
+        resultList.add(new Result(Status.UNKNOWN, cm, null));
+      else
+        writes.add(cm);
+    }
+    
+    if (writes.size() > 0) {
+      Iterator<Result> results = cw.write(writes.iterator());
+      
+      while (results.hasNext()) {
+        Result result = results.next();
+        
+        if (rand.nextDouble() <= up && rand.nextDouble() <= wp)
+          result = new Result(Status.UNKNOWN, result.getMutation(), result.getTabletServer());
+        resultList.add(result);
+      }
+    }
+    return resultList.iterator();
+  }
+  
+  public Result write(ConditionalMutation mutation) {
+    return write(Collections.singleton(mutation).iterator()).next();
+  }
+  
+  @Override
+  public void close() {
+    cw.close();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
index e2db273..1c62720 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
@@ -30,6 +30,11 @@ public class BadIterator extends WrappingIterator {
   }
   
   @Override
+  public boolean hasTop() {
+    throw new NullPointerException();
+  }
+  
+  @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
index a71b1ad..03eaefb 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
@@ -17,10 +17,13 @@
 package org.apache.accumulo.test.functional;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
 
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -30,13 +33,19 @@ import org.apache.accumulo.core.util.UtilWaitThread;
 public class SlowIterator extends WrappingIterator {
 
   static private final String SLEEP_TIME = "sleepTime";
+  static private final String SEEK_SLEEP_TIME = "seekSleepTime";
   
-  long sleepTime;
+  private long sleepTime = 0;
+  private long seekSleepTime = 0;
   
   public static void setSleepTime(IteratorSetting is, long millis) {
     is.addOption(SLEEP_TIME, Long.toString(millis));  
   }
   
+  public static void setSeekSleepTime(IteratorSetting is, long t) {
+    is.addOption(SEEK_SLEEP_TIME, Long.toString(t));
+  }
+
   @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
     throw new UnsupportedOperationException();
@@ -49,9 +58,20 @@ public class SlowIterator extends WrappingIterator {
   }
   
   @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    UtilWaitThread.sleep(seekSleepTime);
+    super.seek(range, columnFamilies, inclusive);
+  }
+  
+  @Override
   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
     super.init(source, options, env);
-    sleepTime = Long.parseLong(options.get(SLEEP_TIME));
+    if (options.containsKey(SLEEP_TIME))
+      sleepTime = Long.parseLong(options.get(SLEEP_TIME));
+    
+    if (options.containsKey(SEEK_SLEEP_TIME))
+      seekSleepTime = Long.parseLong(options.get(SEEK_SLEEP_TIME));
   }
+
   
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index e33603f..c84fd7f 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -40,7 +40,10 @@ import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.data.thrift.MapFileInfo;
 import org.apache.accumulo.core.data.thrift.MultiScanResult;
 import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TCMResult;
 import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
+import org.apache.accumulo.core.data.thrift.TConditionalSession;
 import org.apache.accumulo.core.data.thrift.TConstraintViolationSummary;
 import org.apache.accumulo.core.data.thrift.TKeyExtent;
 import org.apache.accumulo.core.data.thrift.TMutation;
@@ -50,6 +53,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
@@ -200,6 +204,25 @@ public class NullTserver {
     public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
       return new ArrayList<ActiveCompaction>();
     }
+
+    @Override
+    public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID)
+        throws ThriftSecurityException,
+        TException {
+      return null;
+    }
+
+    @Override
+    public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
+        throws NoSuchScanIDException, TException {
+      return null;
+    }
+
+    @Override
+    public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {}
+    
+    @Override
+    public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException {}
   }
   
   static class Opts extends Help {