You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/13 01:29:58 UTC
svn commit: r1502726 [4/4] - in /accumulo/branches/ACCUMULO-1000:
core/src/main/java/org/apache/accumulo/core/client/
core/src/main/java/org/apache/accumulo/core/client/impl/
core/src/main/java/org/apache/accumulo/core/client/mock/
core/src/main/java/o...
Modified: accumulo/branches/ACCUMULO-1000/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java?rev=1502726&r1=1502725&r2=1502726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-1000/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java (original)
+++ accumulo/branches/ACCUMULO-1000/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java Fri Jul 12 23:29:57 2013
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import org.apache.accumulo.core.data.ByteSequence;
import org.apache.hadoop.io.Text;
public class ByteBufferUtil {
@@ -76,4 +77,16 @@ public class ByteBufferUtil {
public static String toString(ByteBuffer bytes) {
return new String(bytes.array(), bytes.position(), bytes.remaining());
}
+
+ public static ByteBuffer toByteBuffers(ByteSequence bs) {
+ if (bs == null)
+ return null;
+
+ if (bs.isBackedByArray()) {
+ return ByteBuffer.wrap(bs.getBackingArray(), bs.offset(), bs.length());
+ } else {
+ // TODO create more efficient impl
+ return ByteBuffer.wrap(bs.toArray());
+ }
+ }
}
Modified: accumulo/branches/ACCUMULO-1000/core/src/main/thrift/data.thrift
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/core/src/main/thrift/data.thrift?rev=1502726&r1=1502725&r2=1502726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-1000/core/src/main/thrift/data.thrift (original)
+++ accumulo/branches/ACCUMULO-1000/core/src/main/thrift/data.thrift Fri Jul 12 23:29:57 2013
@@ -110,10 +110,41 @@ struct UpdateErrors {
3:map<TKeyExtent, client.SecurityErrorCode> authorizationFailures
}
+enum TCMStatus {
+ ACCEPTED,
+ REJECTED,
+ VIOLATED,
+ IGNORED
+}
+
+struct TCMResult {
+ 1:i64 cmid,
+ 2:TCMStatus status
+}
+
struct MapFileInfo {
1:i64 estimatedSize
}
+struct TCondition {
+ 1:binary cf;
+ 2:binary cq;
+ 3:binary cv;
+ 4:i64 ts;
+ 5:bool hasTimestamp;
+ 6:binary val;
+ 7:list<IterInfo> ssiList
+ 8:map<string, map<string, string>> ssio
+}
+
+struct TConditionalMutation {
+ 1:list<TCondition> conditions;
+ 2:TMutation mutation;
+ 3:i64 id;
+}
+
+typedef map<TKeyExtent,list<TConditionalMutation>> CMBatch
+
typedef map<TKeyExtent,list<TMutation>> UpdateBatch
typedef map<TKeyExtent, map<string, MapFileInfo>> TabletFiles
Modified: accumulo/branches/ACCUMULO-1000/core/src/main/thrift/tabletserver.thrift
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/core/src/main/thrift/tabletserver.thrift?rev=1502726&r1=1502725&r2=1502726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-1000/core/src/main/thrift/tabletserver.thrift (original)
+++ accumulo/branches/ACCUMULO-1000/core/src/main/thrift/tabletserver.thrift Fri Jul 12 23:29:57 2013
@@ -160,13 +160,15 @@ service TabletClientService extends clie
data.UpdateID startUpdate(2:trace.TInfo tinfo, 1:security.TCredentials credentials) throws (1:client.ThriftSecurityException sec),
oneway void applyUpdates(1:trace.TInfo tinfo, 2:data.UpdateID updateID, 3:data.TKeyExtent keyExtent, 4:list<data.TMutation> mutations),
data.UpdateErrors closeUpdate(2:trace.TInfo tinfo, 1:data.UpdateID updateID) throws (1:NoSuchScanIDException nssi),
-
+
//the following call supports making a single update to a tablet
void update(4:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:data.TKeyExtent keyExtent, 3:data.TMutation mutation)
throws (1:client.ThriftSecurityException sec,
2:NotServingTabletException nste,
3:ConstraintViolationException cve),
+ list<data.TCMResult> conditionalUpdate(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<binary> authorizations, 4:data.CMBatch mutations);
+
// on success, returns an empty list
list<data.TKeyExtent> bulkImport(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 4:i64 tid, 2:data.TabletFiles files, 5:bool setTime) throws (1:client.ThriftSecurityException sec),
Added: accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java?rev=1502726&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java (added)
+++ accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java Fri Jul 12 23:29:57 2013
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.data;
+
+import java.util.List;
+
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.data.thrift.TCondition;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
+
+/**
+ *
+ */
+public class ServerConditionalMutation extends ServerMutation {
+
+ public static class TCMTranslator extends Translator<TConditionalMutation,ServerConditionalMutation> {
+ @Override
+ public ServerConditionalMutation translate(TConditionalMutation input) {
+ return new ServerConditionalMutation(input);
+ }
+ }
+
+ public static final TCMTranslator TCMT = new TCMTranslator();
+
+ private long cmid;
+ private List<TCondition> conditions;
+
+ public ServerConditionalMutation(TConditionalMutation input) {
+ super(input.mutation);
+
+ this.cmid = input.id;
+ this.conditions = input.conditions;
+ }
+
+ public long getID() {
+ return cmid;
+ }
+
+ public List<TCondition> getConditions() {
+ return conditions;
+ }
+
+
+}
Added: accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java?rev=1502726&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java (added)
+++ accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java Fri Jul 12 23:29:57 2013
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.data.ServerConditionalMutation;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ *
+ */
+public class ConditionalMutationSet {
+
+ static interface DeferFilter {
+ void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred);
+ }
+
+ static class DuplicateFitler implements DeferFilter {
+ public void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred) {
+ okMutations.add(scml.get(0));
+ for (int i = 1; i < scml.size(); i++) {
+ if (Arrays.equals(scml.get(i - 1).getRow(), scml.get(i).getRow())) {
+ deferred.add(scml.get(i));
+ } else {
+ okMutations.add(scml.get(i));
+ }
+ }
+ }
+ }
+
+ static void defer(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferredMutations, DeferFilter filter) {
+ for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
+ List<ServerConditionalMutation> scml = entry.getValue();
+ List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(scml.size());
+ List<ServerConditionalMutation> deferred = new ArrayList<ServerConditionalMutation>();
+ filter.defer(scml, okMutations, deferred);
+
+ if (deferred.size() > 0) {
+ scml.clear();
+ scml.addAll(okMutations);
+ List<ServerConditionalMutation> l = deferredMutations.get(entry.getKey());
+ if (l == null) {
+ l = deferred;
+ deferredMutations.put(entry.getKey(), l);
+ } else {
+ l.addAll(deferred);
+ }
+
+ }
+ }
+ }
+
+ static void deferDuplicatesRows(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
+ defer(updates, deferred, new DuplicateFitler());
+ }
+
+ static void sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates) {
+ for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
+ // TODO check if its already in sorted order?
+ // TODO maybe the potential benefit of sorting is not worth the cost
+ Collections.sort(entry.getValue(), new Comparator<ServerConditionalMutation>() {
+ @Override
+ public int compare(ServerConditionalMutation o1, ServerConditionalMutation o2) {
+ return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length);
+ }
+ });
+ }
+ }
+}
Added: accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java?rev=1502726&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java (added)
+++ accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java Fri Jul 12 23:29:57 2013
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.data.ServerConditionalMutation;
+import org.apache.accumulo.server.tabletserver.ConditionalMutationSet.DeferFilter;
+
+/**
+ *
+ */
+class RowLocks {
+
+ private Map<ByteSequence,RowLock> rowLocks = new HashMap<ByteSequence,RowLock>();
+
+ static class RowLock {
+ ReentrantLock rlock;
+ int count;
+ ByteSequence rowSeq;
+
+ RowLock(ReentrantLock rlock, ByteSequence rowSeq) {
+ this.rlock = rlock;
+ this.count = 0;
+ this.rowSeq = rowSeq;
+ }
+
+ public boolean tryLock() {
+ return rlock.tryLock();
+ }
+
+ public void lock() {
+ rlock.lock();
+ }
+
+ public void unlock() {
+ rlock.unlock();
+ }
+ }
+
+ private RowLock getRowLock(ArrayByteSequence rowSeq) {
+ RowLock lock = rowLocks.get(rowSeq);
+ if (lock == null) {
+ lock = new RowLock(new ReentrantLock(), rowSeq);
+ rowLocks.put(rowSeq, lock);
+ }
+
+ lock.count++;
+ return lock;
+ }
+
+ private void returnRowLock(RowLock lock) {
+ if (lock.count == 0)
+ throw new IllegalStateException();
+ lock.count--;
+
+ if (lock.count == 0) {
+ rowLocks.remove(lock.rowSeq);
+ }
+ }
+
+ List<RowLock> acquireRowlocks(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
+ ArrayList<RowLock> locks = new ArrayList<RowLock>();
+
+ // assume that mutations are in sorted order to avoid deadlock
+ synchronized (rowLocks) {
+ for (List<ServerConditionalMutation> scml : updates.values()) {
+ for (ServerConditionalMutation scm : scml) {
+ locks.add(getRowLock(new ArrayByteSequence(scm.getRow())));
+ }
+ }
+ }
+
+ HashSet<ByteSequence> rowsNotLocked = null;
+
+ // acquire as many locks as possible, not blocking on rows that are already locked
+ if (locks.size() > 1) {
+ for (RowLock rowLock : locks) {
+ if (!rowLock.tryLock()) {
+ if (rowsNotLocked == null)
+ rowsNotLocked = new HashSet<ByteSequence>();
+ rowsNotLocked.add(rowLock.rowSeq);
+ }
+ }
+ } else {
+ // if there is only one lock, then wait for it
+ locks.get(0).lock();
+ }
+
+ if (rowsNotLocked != null) {
+
+ final HashSet<ByteSequence> rnlf = rowsNotLocked;
+ // assume will get locks needed, do something expensive otherwise
+ ConditionalMutationSet.defer(updates, deferred, new DeferFilter() {
+ @Override
+ public void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred) {
+ for (ServerConditionalMutation scm : scml) {
+ if (rnlf.contains(new ArrayByteSequence(scm.getRow())))
+ deferred.add(scm);
+ else
+ okMutations.add(scm);
+
+ }
+ }
+ });
+
+ ArrayList<RowLock> filteredLocks = new ArrayList<RowLock>();
+ ArrayList<RowLock> locksToReturn = new ArrayList<RowLock>();
+ for (RowLock rowLock : locks) {
+ if (rowsNotLocked.contains(rowLock.rowSeq)) {
+ locksToReturn.add(rowLock);
+ } else {
+ filteredLocks.add(rowLock);
+ }
+ }
+
+ synchronized (rowLocks) {
+ for (RowLock rowLock : locksToReturn) {
+ returnRowLock(rowLock);
+ }
+ }
+
+ locks = filteredLocks;
+ }
+ return locks;
+ }
+
+ void releaseRowLocks(List<RowLock> locks) {
+ for (RowLock rowLock : locks) {
+ rowLock.unlock();
+ }
+
+ synchronized (rowLocks) {
+ for (RowLock rowLock : locks) {
+ returnRowLock(rowLock);
+ }
+ }
+ }
+
+}
Modified: accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1502726&r1=1502725&r2=1502726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Fri Jul 12 23:29:57 2013
@@ -88,7 +88,11 @@ import org.apache.accumulo.core.data.thr
import org.apache.accumulo.core.data.thrift.MapFileInfo;
import org.apache.accumulo.core.data.thrift.MultiScanResult;
import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TCMResult;
+import org.apache.accumulo.core.data.thrift.TCMStatus;
import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TCondition;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
import org.apache.accumulo.core.data.thrift.TKey;
import org.apache.accumulo.core.data.thrift.TKeyExtent;
import org.apache.accumulo.core.data.thrift.TKeyValue;
@@ -141,6 +145,7 @@ import org.apache.accumulo.server.client
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.data.ServerConditionalMutation;
import org.apache.accumulo.server.data.ServerMutation;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
@@ -159,6 +164,7 @@ import org.apache.accumulo.server.securi
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo;
+import org.apache.accumulo.server.tabletserver.RowLocks.RowLock;
import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
import org.apache.accumulo.server.tabletserver.Tablet.KVEntry;
import org.apache.accumulo.server.tabletserver.Tablet.LookupResult;
@@ -1690,6 +1696,202 @@ public class TabletServer extends Abstra
}
}
+ private RowLocks rowLocks = new RowLocks();
+
+ private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, Authorizations authorizations) {
+ Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
+
+ // TODO use constant
+ HashSet<Column> columns = new HashSet<Column>();
+
+ while (iter.hasNext()) {
+ Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
+ Tablet tablet = onlineTablets.get(entry.getKey());
+
+ if (tablet == null || tablet.isClosed()) {
+ for (ServerConditionalMutation scm : entry.getValue())
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ iter.remove();
+ } else {
+ List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
+
+ // TODO extract to method
+ for (ServerConditionalMutation scm : entry.getValue()) {
+ boolean add = true;
+ for(TCondition tc : scm.getConditions()){
+
+ Range range;
+ if (tc.hasTimestamp)
+ range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
+ else
+ range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
+
+ AtomicBoolean interruptFlag = new AtomicBoolean();
+
+ //TODO use one iterator per tablet, push checks into tablet?
+ Scanner scanner = tablet.createScanner(range, 1, columns, authorizations, tc.ssiList, tc.ssio, false, interruptFlag);
+
+ try {
+ ScanBatch batch = scanner.read();
+
+ Value val = null;
+
+ for (KVEntry entry2 : batch.results) {
+ val = entry2.getValue();
+ break;
+ }
+
+ if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
+ add = false;
+ break;
+ }
+
+ } catch (TabletClosedException e) {
+ // TODO ignore rest of tablets mutations
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ add = false;
+ break;
+ } catch (IterationInterruptedException iie) {
+ // TODO determine why this happened, ignore rest of tablets mutations?
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ add = false;
+ break;
+ } catch (TooManyFilesException tmfe) {
+ // TODO handle differently?
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ add = false;
+ break;
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally {
+ scanner.close();
+ }
+ }
+
+ if (add)
+ okMutations.add(scm);
+ }
+
+ // TODO just rebuild map
+ entry.getValue().clear();
+ entry.getValue().addAll(okMutations);
+ }
+
+ }
+ }
+
+ private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, TCredentials credentials) {
+ Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
+
+ Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
+
+ // TODO stats
+
+ for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
+ Tablet tablet = onlineTablets.get(entry.getKey());
+ if (tablet == null || tablet.isClosed()) {
+ for (ServerConditionalMutation scm : entry.getValue())
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ } else {
+ // TODO write tracker
+
+ try {
+
+ List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
+ if (mutations.size() > 0) {
+
+ CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations);
+
+ if (cs == null) {
+ for (ServerConditionalMutation scm : entry.getValue())
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ } else {
+ for (ServerConditionalMutation scm : entry.getValue())
+ results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
+ sendables.put(cs, mutations);
+ }
+ }
+ } catch (TConstraintViolationException e) {
+ if (e.getNonViolators().size() > 0) {
+ sendables.put(e.getCommitSession(), e.getNonViolators());
+ for (Mutation m : e.getNonViolators())
+ results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
+ }
+
+ for (Mutation m : e.getViolators())
+ results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED));
+ }
+ }
+ }
+
+ while (true && sendables.size() > 0) {
+ try {
+ logger.logManyTablets(sendables);
+ break;
+ } catch (IOException ex) {
+ log.warn("logging mutations failed, retrying");
+ } catch (FSError ex) { // happens when DFS is localFS
+ log.warn("logging mutations failed, retrying");
+ } catch (Throwable t) {
+ log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
+ throw new RuntimeException(t);
+ }
+ }
+
+ for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
+ CommitSession commitSession = entry.getKey();
+ List<Mutation> mutations = entry.getValue();
+
+ commitSession.commit(mutations);
+ }
+
+ }
+
+ private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(TCredentials credentials, List<ByteBuffer> authorizations,
+ Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results) {
+ // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
+ ConditionalMutationSet.sortConditionalMutations(updates);
+
+ Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<KeyExtent,List<ServerConditionalMutation>>();
+
+ // can not process two mutations for the same row, because one will not see what the other writes
+ ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
+
+ // get as many locks as possible w/o blocking... defer any rows that are locked
+ List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
+ try {
+ checkConditions(updates, results, new Authorizations(authorizations));
+ writeConditionalMutations(updates, results, credentials);
+ } finally {
+ rowLocks.releaseRowLocks(locks);
+ }
+ return deferred;
+ }
+
+ @Override
+ public List<TCMResult> conditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations,
+ Map<TKeyExtent,List<TConditionalMutation>> mutations) throws TException {
+ // TODO check credentials, permissions, and authorizations
+ // TODO sessions, should show up in list scans
+ // TODO timeout like scans do
+
+ Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
+ new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
+
+ ArrayList<TCMResult> results = new ArrayList<TCMResult>();
+
+ Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(credentials, authorizations, updates, results);
+
+ while (deferred.size() > 0) {
+ deferred = conditionalUpdate(credentials, authorizations, deferred, results);
+ }
+
+ return results;
+ }
+
+
@Override
public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException,
ThriftSecurityException {
Added: accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java?rev=1502726&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java (added)
+++ accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java Fri Jul 12 23:29:57 2013
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.data.ConditionalMutation;
+
+
+/**
+ * A writer that will sometimes return unknown. When it returns unknown the condition may or may not have been written.
+ */
+public class FaultyConditionalWriter implements ConditionalWriter {
+
+ private ConditionalWriter cw;
+ private double up;
+ private Random rand;
+ private double wp;
+
+ public FaultyConditionalWriter(ConditionalWriter cw, double unknownProbability, double writeProbability) {
+ this.cw = cw;
+ this.up = unknownProbability;
+ this.wp = writeProbability;
+ this.rand = new Random();
+
+ }
+
+ public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
+ ArrayList<Result> resultList = new ArrayList<Result>();
+ ArrayList<ConditionalMutation> writes = new ArrayList<ConditionalMutation>();
+
+ while (mutations.hasNext()) {
+ ConditionalMutation cm = mutations.next();
+ if (rand.nextDouble() <= up && rand.nextDouble() > wp)
+ resultList.add(new Result(Status.UNKNOWN, cm));
+ else
+ writes.add(cm);
+ }
+
+ if (writes.size() > 0) {
+ Iterator<Result> results = cw.write(writes.iterator());
+
+ while (results.hasNext()) {
+ Result result = results.next();
+
+ if (rand.nextDouble() <= up && rand.nextDouble() <= wp)
+ result = new Result(Status.UNKNOWN, result.getMutation());
+ resultList.add(result);
+ }
+ }
+ return resultList.iterator();
+ }
+
+ public Result write(ConditionalMutation mutation) {
+ return write(Collections.singleton(mutation).iterator()).next();
+ }
+
+ public void setTimeout(long timeOut, TimeUnit timeUnit) {
+ cw.setTimeout(timeOut, timeUnit);
+ }
+
+ public long getTimeout(TimeUnit timeUnit) {
+ return cw.getTimeout(timeUnit);
+ }
+
+}
Modified: accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java?rev=1502726&r1=1502725&r2=1502726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java (original)
+++ accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java Fri Jul 12 23:29:57 2013
@@ -40,7 +40,9 @@ import org.apache.accumulo.core.data.thr
import org.apache.accumulo.core.data.thrift.MapFileInfo;
import org.apache.accumulo.core.data.thrift.MultiScanResult;
import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TCMResult;
import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
import org.apache.accumulo.core.data.thrift.TConstraintViolationSummary;
import org.apache.accumulo.core.data.thrift.TKeyExtent;
import org.apache.accumulo.core.data.thrift.TMutation;
@@ -200,6 +202,12 @@ public class NullTserver {
public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
return new ArrayList<ActiveCompaction>();
}
+
+ @Override
+ public List<TCMResult> conditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations,
+ Map<TKeyExtent,List<TConditionalMutation>> mutations) throws TException {
+ return null;
+ }
}
static class Opts extends Help {
Added: accumulo/branches/ACCUMULO-1000/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java?rev=1502726&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-1000/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java (added)
+++ accumulo/branches/ACCUMULO-1000/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java Fri Jul 12 23:29:57 2013
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Result;
+import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.LongCombiner.Type;
+import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.examples.simple.constraints.AlphaNumKeyConstraint;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.hadoop.io.Text;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+
+/**
+ *
+ */
+public class ConditionalWriterTest {
+
+ private static String secret = "superSecret";
+ public static TemporaryFolder folder = new TemporaryFolder();
+ public static MiniAccumuloCluster cluster;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ folder.create();
+ MiniAccumuloConfig cfg = new MiniAccumuloConfig(folder.newFolder("miniAccumulo"), secret);
+ cluster = new MiniAccumuloCluster(cfg);
+ cluster.start();
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+ Connector conn = zki.getConnector("root", new PasswordToken(secret));
+
+ conn.tableOperations().create("foo");
+
+ ConditionalWriter cw = conn.createConditionalWriter("foo", Authorizations.EMPTY);
+
+ // mutation conditional on column tx:seq not exiting
+ ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq"));
+ cm0.put("name", "last", "doe");
+ cm0.put("name", "first", "john");
+ cm0.put("tx", "seq", "1");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
+ Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
+
+ // mutation conditional on column tx:seq being 1
+ ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"));
+ cm1.put("name", "last", "Doe");
+ cm1.put("tx", "seq", "2");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
+
+ // test condition where value differs
+ ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"));
+ cm2.put("name", "last", "DOE");
+ cm2.put("tx", "seq", "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus());
+
+ // test condition where column does not exists
+ ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("txtypo", "seq").setValue("1"));
+ cm3.put("name", "last", "deo");
+ cm3.put("tx", "seq", "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus());
+
+ // test two conditions, where one should fail
+ ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"), new Condition("name", "last").setValue("doe"));
+ cm4.put("name", "last", "deo");
+ cm4.put("tx", "seq", "3");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus());
+
+ // test two conditions, where one should fail
+ ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"), new Condition("name", "last").setValue("Doe"));
+ cm5.put("name", "last", "deo");
+ cm5.put("tx", "seq", "3");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus());
+
+ // ensure rejected mutations did not write
+ Scanner scanner = conn.createScanner("foo", Authorizations.EMPTY);
+ scanner.fetchColumn(new Text("name"), new Text("last"));
+ scanner.setRange(new Range("99006"));
+ Assert.assertEquals("Doe", scanner.iterator().next().getValue().toString());
+
+ // test w/ two conditions that are met
+ ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"), new Condition("name", "last").setValue("Doe"));
+ cm6.put("name", "last", "DOE");
+ cm6.put("tx", "seq", "3");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
+
+ Assert.assertEquals("DOE", scanner.iterator().next().getValue().toString());
+
+ // test a conditional mutation that deletes
+ ConditionalMutation cm7 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("3"));
+ cm7.putDelete("name", "last");
+ cm7.putDelete("name", "first");
+ cm7.putDelete("tx", "seq");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus());
+
+ Assert.assertFalse(scanner.iterator().hasNext());
+
+ // add the row back
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
+ Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
+
+ Assert.assertEquals("doe", scanner.iterator().next().getValue().toString());
+ }
+
+ @Test
+ public void testFields() throws Exception {
+ String table = "foo2";
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+ Connector conn = zki.getConnector("root", new PasswordToken(secret));
+
+ conn.tableOperations().create(table);
+
+ Authorizations auths = new Authorizations("A", "B");
+
+ conn.securityOperations().changeUserAuthorizations("root", auths);
+
+ ConditionalWriter cw = conn.createConditionalWriter(table, auths);
+
+ ColumnVisibility cva = new ColumnVisibility("A");
+ ColumnVisibility cvb = new ColumnVisibility("B");
+
+ ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva));
+ cm0.put("name", "last", cva, "doe");
+ cm0.put("name", "first", cva, "john");
+ cm0.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
+
+ Scanner scanner = conn.createScanner(table, auths);
+ scanner.setRange(new Range("99006"));
+ // TODO verify all columns
+ scanner.fetchColumn(new Text("tx"), new Text("seq"));
+ Entry<Key,Value> entry = scanner.iterator().next();
+ Assert.assertEquals("1", entry.getValue().toString());
+ long ts = entry.getKey().getTimestamp();
+
+ // test wrong colf
+ ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("txA", "seq").setVisibility(cva).setValue("1"));
+ cm1.put("name", "last", cva, "Doe");
+ cm1.put("name", "first", cva, "John");
+ cm1.put("tx", "seq", cva, "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus());
+
+ // test wrong colq
+ ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seqA").setVisibility(cva).setValue("1"));
+ cm2.put("name", "last", cva, "Doe");
+ cm2.put("name", "first", cva, "John");
+ cm2.put("tx", "seq", cva, "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus());
+
+ // test wrong colv
+ ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"));
+ cm3.put("name", "last", cva, "Doe");
+ cm3.put("name", "first", cva, "John");
+ cm3.put("tx", "seq", cva, "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus());
+
+ // test wrong timestamp
+ ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts + 1).setValue("1"));
+ cm4.put("name", "last", cva, "Doe");
+ cm4.put("name", "first", cva, "John");
+ cm4.put("tx", "seq", cva, "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus());
+
+ // test wrong timestamp
+ ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts - 1).setValue("1"));
+ cm5.put("name", "last", cva, "Doe");
+ cm5.put("name", "first", cva, "John");
+ cm5.put("tx", "seq", cva, "2");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus());
+
+ // ensure no updates were made
+ entry = scanner.iterator().next();
+ Assert.assertEquals("1", entry.getValue().toString());
+
+ // set all columns correctly
+ ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts).setValue("1"));
+ cm6.put("name", "last", cva, "Doe");
+ cm6.put("name", "first", cva, "John");
+ cm6.put("tx", "seq", cva, "2");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
+
+ entry = scanner.iterator().next();
+ Assert.assertEquals("2", entry.getValue().toString());
+
+ // TODO test each field w/ absence
+
+ }
+
+ @Test
+ public void testBadColVis() throws Exception {
+ // test when a user sets a col vis in a condition that can never be seen
+ String table = "foo3";
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+ Connector conn = zki.getConnector("root", new PasswordToken(secret));
+
+ conn.tableOperations().create(table);
+
+ Authorizations auths = new Authorizations("A", "B");
+
+ conn.securityOperations().changeUserAuthorizations("root", auths);
+
+ Authorizations filteredAuths = new Authorizations("A");
+
+ ConditionalWriter cw = conn.createConditionalWriter(table, filteredAuths);
+
+ ColumnVisibility cva = new ColumnVisibility("A");
+ ColumnVisibility cvb = new ColumnVisibility("B");
+ ColumnVisibility cvc = new ColumnVisibility("C");
+
+ // User has authorization, but didn't include it in the writer
+ ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb));
+ cm0.put("name", "last", cva, "doe");
+ cm0.put("name", "first", cva, "john");
+ cm0.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm0).getStatus());
+
+ ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"));
+ cm1.put("name", "last", cva, "doe");
+ cm1.put("name", "first", cva, "john");
+ cm1.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm1).getStatus());
+
+ // User does not have the authorization
+ ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvc));
+ cm2.put("name", "last", cva, "doe");
+ cm2.put("name", "first", cva, "john");
+ cm2.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm2).getStatus());
+
+ ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvc).setValue("1"));
+ cm3.put("name", "last", cva, "doe");
+ cm3.put("name", "first", cva, "john");
+ cm3.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm3).getStatus());
+
+ // if any visibility is bad, good visibilities don't override
+ ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva));
+
+ cm4.put("name", "last", cva, "doe");
+ cm4.put("name", "first", cva, "john");
+ cm4.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm4).getStatus());
+
+ ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"), new Condition("tx", "seq")
+ .setVisibility(cva).setValue("1"));
+ cm5.put("name", "last", cva, "doe");
+ cm5.put("name", "first", cva, "john");
+ cm5.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm5).getStatus());
+
+ ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"),
+ new Condition("tx", "seq").setVisibility(cva));
+ cm6.put("name", "last", cva, "doe");
+ cm6.put("name", "first", cva, "john");
+ cm6.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm6).getStatus());
+
+ ConditionalMutation cm7 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva)
+ .setValue("1"));
+ cm7.put("name", "last", cva, "doe");
+ cm7.put("name", "first", cva, "john");
+ cm7.put("tx", "seq", cva, "1");
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm7).getStatus());
+ }
+
+ @Test
+ public void testConstraints() throws Exception {
+ // ensure constraint violations are properly reported
+ String table = "foo5";
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+ Connector conn = zki.getConnector("root", new PasswordToken(secret));
+
+ conn.tableOperations().create(table);
+ conn.tableOperations().addConstraint(table, AlphaNumKeyConstraint.class.getName());
+ conn.tableOperations().clone(table, table + "_clone", true, new HashMap<String,String>(), new HashSet<String>());
+
+ Scanner scanner = conn.createScanner(table + "_clone", new Authorizations());
+
+ ConditionalWriter cw = conn.createConditionalWriter(table + "_clone", new Authorizations());
+
+ ConditionalMutation cm0 = new ConditionalMutation("99006+", new Condition("tx", "seq"));
+ cm0.put("tx", "seq", "1");
+
+ Assert.assertEquals(Status.VIOLATED, cw.write(cm0).getStatus());
+ Assert.assertFalse(scanner.iterator().hasNext());
+
+ ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq"));
+ cm1.put("tx", "seq", "1");
+
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
+ Assert.assertTrue(scanner.iterator().hasNext());
+
+ }
+
+ @Test
+ public void testIterators() throws Exception {
+ String table = "foo4";
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+ Connector conn = zki.getConnector("root", new PasswordToken(secret));
+
+ conn.tableOperations().create(table, false);
+
+ BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+
+ Mutation m = new Mutation("ACCUMULO-1000");
+ m.put("count", "comments", "1");
+ bw.addMutation(m);
+ bw.addMutation(m);
+ bw.addMutation(m);
+ bw.close();
+
+ IteratorSetting iterConfig = new IteratorSetting(10, SummingCombiner.class);
+ SummingCombiner.setEncodingType(iterConfig, Type.STRING);
+ SummingCombiner.setColumns(iterConfig, Collections.singletonList(new IteratorSetting.Column("count")));
+
+ Scanner scanner = conn.createScanner(table, new Authorizations());
+ scanner.addScanIterator(iterConfig);
+ scanner.setRange(new Range("ACCUMULO-1000"));
+ scanner.fetchColumn(new Text("count"), new Text("comments"));
+
+ Assert.assertEquals("3", scanner.iterator().next().getValue().toString());
+
+ ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations());
+
+ ConditionalMutation cm0 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("3"));
+ cm0.put("count", "comments", "1");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
+ Assert.assertEquals("3", scanner.iterator().next().getValue().toString());
+
+ ConditionalMutation cm1 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(iterConfig).setValue("3"));
+ cm1.put("count", "comments", "1");
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
+ Assert.assertEquals("4", scanner.iterator().next().getValue().toString());
+
+ ConditionalMutation cm2 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("4"));
+ cm2.put("count", "comments", "1");
+ Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus());
+ Assert.assertEquals("4", scanner.iterator().next().getValue().toString());
+
+ // TODO test conditions with different iterators
+ // TODO test w/ table that has iterators configured
+ }
+
+ @Test
+ public void testBatch() throws Exception {
+ String table = "foo6";
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+ Connector conn = zki.getConnector("root", new PasswordToken(secret));
+
+ conn.tableOperations().create(table);
+
+ conn.securityOperations().changeUserAuthorizations("root", new Authorizations("A", "B"));
+
+ ColumnVisibility cvab = new ColumnVisibility("A|B");
+
+ ArrayList<ConditionalMutation> mutations = new ArrayList<ConditionalMutation>();
+
+ ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvab));
+ cm0.put("name", "last", cvab, "doe");
+ cm0.put("name", "first", cvab, "john");
+ cm0.put("tx", "seq", cvab, "1");
+ mutations.add(cm0);
+
+ ConditionalMutation cm1 = new ConditionalMutation("59056", new Condition("tx", "seq").setVisibility(cvab));
+ cm1.put("name", "last", cvab, "doe");
+ cm1.put("name", "first", cvab, "jane");
+ cm1.put("tx", "seq", cvab, "1");
+ mutations.add(cm1);
+
+ ConditionalMutation cm2 = new ConditionalMutation("19059", new Condition("tx", "seq").setVisibility(cvab));
+ cm2.put("name", "last", cvab, "doe");
+ cm2.put("name", "first", cvab, "jack");
+ cm2.put("tx", "seq", cvab, "1");
+ mutations.add(cm2);
+
+ ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations("A"));
+ Iterator<Result> results = cw.write(mutations.iterator());
+ int count = 0;
+ while (results.hasNext()) {
+ Result result = results.next();
+ Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+ count++;
+ }
+
+ Assert.assertEquals(3, count);
+
+ Scanner scanner = conn.createScanner(table, new Authorizations("A"));
+ scanner.fetchColumn(new Text("tx"), new Text("seq"));
+
+ for (String row : new String[] {"99006", "59056", "19059"}) {
+ scanner.setRange(new Range(row));
+ Assert.assertEquals("1", scanner.iterator().next().getValue().toString());
+ }
+
+ TreeSet<Text> splits = new TreeSet<Text>();
+ splits.add(new Text("7"));
+ splits.add(new Text("3"));
+ conn.tableOperations().addSplits(table, splits);
+
+ mutations.clear();
+
+ ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvab).setValue("1"));
+ cm3.put("name", "last", cvab, "Doe");
+ cm3.put("tx", "seq", cvab, "2");
+ mutations.add(cm3);
+
+ ConditionalMutation cm4 = new ConditionalMutation("59056", new Condition("tx", "seq").setVisibility(cvab));
+ cm4.put("name", "last", cvab, "Doe");
+ cm4.put("tx", "seq", cvab, "1");
+ mutations.add(cm4);
+
+ ConditionalMutation cm5 = new ConditionalMutation("19059", new Condition("tx", "seq").setVisibility(cvab).setValue("2"));
+ cm5.put("name", "last", cvab, "Doe");
+ cm5.put("tx", "seq", cvab, "3");
+ mutations.add(cm5);
+
+ results = cw.write(mutations.iterator());
+ int accepted = 0;
+ int rejected = 0;
+ while (results.hasNext()) {
+ Result result = results.next();
+ if (new String(result.getMutation().getRow()).equals("99006")) {
+ Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+ accepted++;
+ } else {
+ Assert.assertEquals(Status.REJECTED, result.getStatus());
+ rejected++;
+ }
+ }
+
+ Assert.assertEquals(1, accepted);
+ Assert.assertEquals(2, rejected);
+
+ for (String row : new String[] {"59056", "19059"}) {
+ scanner.setRange(new Range(row));
+ Assert.assertEquals("1", scanner.iterator().next().getValue().toString());
+ }
+
+ scanner.setRange(new Range("99006"));
+ Assert.assertEquals("2", scanner.iterator().next().getValue().toString());
+
+ scanner.clearColumns();
+ scanner.fetchColumn(new Text("name"), new Text("last"));
+ Assert.assertEquals("Doe", scanner.iterator().next().getValue().toString());
+ }
+
+ @Test
+ public void testBigBatch() throws Exception {
+
+ String table = "foo100";
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+ Connector conn = zki.getConnector("root", new PasswordToken(secret));
+
+ conn.tableOperations().create(table);
+ conn.tableOperations().addSplits(table, nss("2", "4", "6"));
+
+ UtilWaitThread.sleep(2000);
+
+ int num = 100;
+
+ ArrayList<byte[]> rows = new ArrayList<byte[]>(num);
+ ArrayList<ConditionalMutation> cml = new ArrayList<ConditionalMutation>(num);
+
+ Random r = new Random();
+ byte[] e = new byte[0];
+
+ for (int i = 0; i < num; i++) {
+ rows.add(FastFormat.toZeroPaddedString(Math.abs(r.nextLong()), 16, 16, e));
+ }
+
+ for (int i = 0; i < num; i++) {
+ ConditionalMutation cm = new ConditionalMutation(rows.get(i), new Condition("meta", "seq"));
+
+ cm.put("meta", "seq", "1");
+ cm.put("meta", "tx", UUID.randomUUID().toString());
+
+ cml.add(cm);
+ }
+
+ ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+
+ Iterator<Result> results = cw.write(cml.iterator());
+
+ int count = 0;
+
+ // TODO check got each row back
+ while (results.hasNext()) {
+ Result result = results.next();
+ Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+ count++;
+ }
+
+ Assert.assertEquals(num, count);
+
+ ArrayList<ConditionalMutation> cml2 = new ArrayList<ConditionalMutation>(num);
+
+ for (int i = 0; i < num; i++) {
+ ConditionalMutation cm = new ConditionalMutation(rows.get(i), new Condition("meta", "seq").setValue("1"));
+
+ cm.put("meta", "seq", "2");
+ cm.put("meta", "tx", UUID.randomUUID().toString());
+
+ cml2.add(cm);
+ }
+
+ count = 0;
+
+ results = cw.write(cml2.iterator());
+
+ while (results.hasNext()) {
+ Result result = results.next();
+ Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+ count++;
+ }
+
+ Assert.assertEquals(num, count);
+ }
+
+ @Test
+ public void testBatchErrors() throws Exception {
+
+ String table = "foo7";
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+ Connector conn = zki.getConnector("root", new PasswordToken(secret));
+
+ conn.tableOperations().create(table);
+ conn.tableOperations().addConstraint(table, AlphaNumKeyConstraint.class.getName());
+ conn.tableOperations().clone(table, table + "_clone", true, new HashMap<String,String>(), new HashSet<String>());
+
+ conn.securityOperations().changeUserAuthorizations("root", new Authorizations("A", "B"));
+
+ ColumnVisibility cvaob = new ColumnVisibility("A|B");
+ ColumnVisibility cvaab = new ColumnVisibility("A&B");
+
+ switch ((new Random()).nextInt(3)) {
+ case 1:
+ conn.tableOperations().addSplits(table, nss("6"));
+ break;
+ case 2:
+ conn.tableOperations().addSplits(table, nss("2", "95"));
+ break;
+ }
+
+ ArrayList<ConditionalMutation> mutations = new ArrayList<ConditionalMutation>();
+
+ ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvaob));
+ cm0.put("name+", "last", cvaob, "doe");
+ cm0.put("name", "first", cvaob, "john");
+ cm0.put("tx", "seq", cvaob, "1");
+ mutations.add(cm0);
+
+ ConditionalMutation cm1 = new ConditionalMutation("59056", new Condition("tx", "seq").setVisibility(cvaab));
+ cm1.put("name", "last", cvaab, "doe");
+ cm1.put("name", "first", cvaab, "jane");
+ cm1.put("tx", "seq", cvaab, "1");
+ mutations.add(cm1);
+
+ ConditionalMutation cm2 = new ConditionalMutation("19059", new Condition("tx", "seq").setVisibility(cvaob));
+ cm2.put("name", "last", cvaob, "doe");
+ cm2.put("name", "first", cvaob, "jack");
+ cm2.put("tx", "seq", cvaob, "1");
+ mutations.add(cm2);
+
+ ConditionalMutation cm3 = new ConditionalMutation("90909", new Condition("tx", "seq").setVisibility(cvaob).setValue("1"));
+ cm3.put("name", "last", cvaob, "doe");
+ cm3.put("name", "first", cvaob, "john");
+ cm3.put("tx", "seq", cvaob, "2");
+ mutations.add(cm3);
+
+ ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations("A"));
+ Iterator<Result> results = cw.write(mutations.iterator());
+ HashSet<String> rows = new HashSet<String>();
+ while (results.hasNext()) {
+ Result result = results.next();
+ String row = new String(result.getMutation().getRow());
+ if (row.equals("19059")) {
+ Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+ } else if (row.equals("59056")) {
+ Assert.assertEquals(Status.INVISIBLE_VISIBILITY, result.getStatus());
+ } else if (row.equals("99006")) {
+ Assert.assertEquals(Status.VIOLATED, result.getStatus());
+ } else if (row.equals("90909")) {
+ Assert.assertEquals(Status.REJECTED, result.getStatus());
+ }
+ rows.add(row);
+ }
+
+ Assert.assertEquals(4, rows.size());
+
+ Scanner scanner = conn.createScanner(table, new Authorizations("A"));
+ scanner.fetchColumn(new Text("tx"), new Text("seq"));
+
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ Assert.assertEquals("1", iter.next().getValue().toString());
+ Assert.assertFalse(iter.hasNext());
+
+ }
+
+ @Test
+ public void testSameRow() throws Exception {
+ // test multiple mutations for same row in same batch
+
+ String table = "foo8";
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+ Connector conn = zki.getConnector("root", new PasswordToken(secret));
+
+ conn.tableOperations().create(table);
+
+ ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+
+ ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
+ cm1.put("tx", "seq", "1");
+ cm1.put("data", "x", "a");
+
+ Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
+
+ ConditionalMutation cm2 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
+ cm2.put("tx", "seq", "2");
+ cm2.put("data", "x", "b");
+
+ ConditionalMutation cm3 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
+ cm3.put("tx", "seq", "2");
+ cm3.put("data", "x", "c");
+
+ ConditionalMutation cm4 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
+ cm4.put("tx", "seq", "2");
+ cm4.put("data", "x", "d");
+
+ Iterator<Result> results = cw.write(Arrays.asList(cm2, cm3, cm4).iterator());
+
+ int accepted = 0;
+ int rejected = 0;
+ int total = 0;
+
+ while (results.hasNext()) {
+ Status status = results.next().getStatus();
+ if (status == Status.ACCEPTED)
+ accepted++;
+ if (status == Status.REJECTED)
+ rejected++;
+ total++;
+ }
+
+ Assert.assertEquals(1, accepted);
+ Assert.assertEquals(2, rejected);
+ Assert.assertEquals(3, total);
+ }
+
+ private SortedSet<Text> nss(String... splits) {
+ TreeSet<Text> ret = new TreeSet<Text>();
+ for (String split : splits)
+ ret.add(new Text(split));
+
+ return ret;
+ }
+
+ @Test
+ public void testSecurity() {
+ // test against table user does not have read and/or write permissions for
+ }
+
+ @Test
+ public void testTimeout() {
+
+ }
+
+ @Test
+ public void testOffline() {
+ // TODO test against a offline table
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ cluster.stop();
+ folder.delete();
+ }
+}