You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/11/19 23:16:09 UTC
git commit: ACCUMULO-1613 added documentation for conditional writer
Updated Branches:
refs/heads/1.6.0-SNAPSHOT 2faafcce4 -> 739718253
ACCUMULO-1613 added documentation for conditional writer
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/73971825
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/73971825
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/73971825
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 739718253bd49dbe5c74e041b5910cfd12edad29
Parents: 2faafcc
Author: Keith Turner <kt...@apache.org>
Authored: Tue Nov 19 17:15:23 2013 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Tue Nov 19 17:15:23 2013 -0500
----------------------------------------------------------------------
.../accumulo/core/client/ConditionalWriter.java | 3 +
.../apache/accumulo/core/data/Condition.java | 54 ++++
.../accumulo/core/data/ConditionalMutation.java | 2 +
.../accumulo_user_manual/chapters/clients.tex | 27 +-
.../examples/simple/reservations/ARS.java | 308 +++++++++++++++++++
.../resources/docs/examples/README.reservations | 66 ++++
6 files changed, 459 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
index 4ed4d31..2c24a2e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
@@ -23,6 +23,9 @@ import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.data.ConditionalMutation;
/**
+ * ConditionalWriter provides the ability to do efficient, atomic read-modify-write operations on rows. These operations are performed on the tablet server
+ * while a row lock is held.
+ *
* @since 1.6.0
*/
public interface ConditionalWriter {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/core/src/main/java/org/apache/accumulo/core/data/Condition.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/Condition.java b/core/src/main/java/org/apache/accumulo/core/data/Condition.java
index df20682..fc8f2bf 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/Condition.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/Condition.java
@@ -26,6 +26,7 @@ import org.apache.accumulo.core.util.ArgumentChecker;
import org.apache.hadoop.io.Text;
/**
+ * Conditions that must be met on a particular column in a row.
*
* @since 1.6.0
*/
@@ -76,6 +77,13 @@ public class Condition {
return cq;
}
+ /**
+ * Sets the version for the column to check. If this is not set then the latest column will be checked, unless iterators do something different.
+ *
+ * @param ts
+ * @return returns this
+ */
+
public Condition setTimestamp(long ts) {
this.ts = ts;
return this;
@@ -85,24 +93,53 @@ public class Condition {
return ts;
}
+ /**
+ * see {@link #setValue(byte[])}
+ *
+ * @param value
+ * @return returns this
+ */
+
public Condition setValue(CharSequence value) {
ArgumentChecker.notNull(value);
this.val = new ArrayByteSequence(value.toString().getBytes(Constants.UTF8));
return this;
}
+ /**
+ * This method sets the expected value of a column. Inorder for the condition to pass the column must exist and have this value. If a value is not set, then
+ * the column must be absent for the condition to pass.
+ *
+ * @param value
+ * @return returns this
+ */
+
public Condition setValue(byte[] value) {
ArgumentChecker.notNull(value);
this.val = new ArrayByteSequence(value);
return this;
}
+ /**
+ * see {@link #setValue(byte[])}
+ *
+ * @param value
+ * @return returns this
+ */
+
public Condition setValue(Text value) {
ArgumentChecker.notNull(value);
this.val = new ArrayByteSequence(value.getBytes(), 0, value.getLength());
return this;
}
+ /**
+ * see {@link #setValue(byte[])}
+ *
+ * @param value
+ * @return returns this
+ */
+
public Condition setValue(ByteSequence value) {
ArgumentChecker.notNull(value);
this.val = value;
@@ -113,6 +150,13 @@ public class Condition {
return val;
}
+ /**
+ * Sets the visibility for the column to check. If not set it defaults to empty visibility.
+ *
+ * @param cv
+ * @return returns this
+ */
+
public Condition setVisibility(ColumnVisibility cv) {
ArgumentChecker.notNull(cv);
this.cv = new ArrayByteSequence(cv.getExpression());
@@ -123,6 +167,16 @@ public class Condition {
return cv;
}
+ /**
+ * Set iterators to use when reading the columns value. These iterators will be applied in addition to the iterators configured for the table. Using iterators
+ * its possible to test other conditions, besides equality and absence, like less than. On the server side the iterators will be seeked using a range that
+ * covers only the family, qualifier, and visibility (if the timestamp is set then it will be used to narrow the range). Value equality will be tested using
+ * the first entry returned by the iterator stack.
+ *
+ * @param iterators
+ * @return returns this
+ */
+
public Condition setIterators(IteratorSetting... iterators) {
ArgumentChecker.notNull(iterators);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java b/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
index 510396d..c438f6d 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
@@ -26,6 +26,8 @@ import org.apache.accumulo.core.util.ArgumentChecker;
import org.apache.hadoop.io.Text;
/**
+ * A Mutation that contains a list of conditions that must all be met before the mutation is applied.
+ *
* @since 1.6.0
*/
public class ConditionalMutation extends Mutation {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex
----------------------------------------------------------------------
diff --git a/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex b/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex
index 18fbafc..9b35d37 100644
--- a/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex
+++ b/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex
@@ -116,6 +116,31 @@ writer.close();
An example of using the batch writer can be found at\\
accumulo/docs/examples/README.batch
+\subsection{ConditionalWriter}
+The ConditionalWriter enables efficient, atomic read-modify-write operations on
+rows. The ConditionalWriter writes special Mutations which have a list of per
+column conditions that must all be met before the mutation is applied. The
+conditions are checked in the tablet server while a row lock is
+held\footnote{Mutations written by the BatchWriter will not obtain a row
+lock.}. The conditions that can be checked for a column are equality and
+absence. For example a conditional mutation can require that column A is
+absent inorder to be applied. Iterators can be applied when checking
+conditions. Using iterators, many other operations besides equality and
+absence can be checked. For example, using an iterator that converts values
+less than 5 to 0 and everything else to 1, its possible to only apply a
+mutation when a column is less than 5.
+
+In the case when a tablet server dies after a client sent a conditional
+mutation, its not known if the mutation was applied or not. When this happens
+the ConditionalWriter reports a status of UNKNOWN for the ConditionalMutation.
+In many cases this situation can be dealt with by simply reading the row again
+and possibly sending another conditional mutation. If this is not sufficient,
+then a higher level of abstraction can be built by storing transactional
+information within a row.
+
+An example of using the batch writer can be found at\\
+accumulo/docs/examples/README.reservations
+
\section{Reading Data}
Accumulo is optimized to quickly retrieve the value associated with a given key, and
@@ -329,4 +354,4 @@ for(KeyValue keyValue : results.getResultsIterator()) {
client.closeScanner(scanner);
\end{verbatim}
-\normalsize
\ No newline at end of file
+\normalsize
http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
new file mode 100644
index 0000000..0c51843
--- /dev/null
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.reservations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import jline.console.ConsoleReader;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Accumulo Reservation System : An example reservation system using Accumulo. Supports atomic reservations of a resource at a date. Wait list are also
+ * supported. Inorder to keep the example simple no checking is done of the date. Also the code is inefficient, if interested in improving it take a look at the
+ * EXCERCISE comments.
+ */
+
+// EXCERCISE create a test that verifies correctness under concurrency. For example, have M threads making reservations against N resources. Each thread could
+// randomly reserver and cancel resources for a single user. When each thread finishes it know what the state of its single user should be. When all threads
+// finish collect their expected state and verify the status of all users and resources. For extra credit run the test on a IAAS provider using 10 nodes and
+// 10 threads per node.
+
+public class ARS {
+
+ private Connector conn;
+ private String rTable;
+
+ public enum ReservationResult {
+ RESERVED, WAIT_LISTED
+ }
+
+ public ARS(Connector conn, String rTable) {
+ this.conn = conn;
+ this.rTable = rTable;
+ }
+
+ public List<String> setCapacity(String what, String when, int count) {
+ // EXCERCISE implement this method which atomically sets a capacity and returns anyone who was moved to the waitlist if the capacity was decreased
+
+ throw new UnsupportedOperationException();
+ }
+
+ public ReservationResult reserve(String what, String when, String who) throws Exception {
+
+ String row = what + ":" + when;
+
+ // EXCERCISE This code assumes there is no reservation and tries to create one. If a reservation exist then the update will fail. This is a good strategy
+ // when its expected there are usually no reservations. Could modify the code to scan first.
+
+ // The following mutation requires that the column tx:seq does not exist and will fail if it does.
+ ConditionalMutation update = new ConditionalMutation(row, new Condition("tx", "seq"));
+ update.put("tx", "seq", "0");
+ update.put("res", String.format("%04d", 0), who);
+
+ ReservationResult result = ReservationResult.RESERVED;
+
+ ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
+
+ try {
+ while (true) {
+ Status status = cwriter.write(update).getStatus();
+ switch (status) {
+ case ACCEPTED:
+ return result;
+ case REJECTED:
+ case UNKNOWN:
+ // read the row and decide what to do
+ break;
+ default:
+ throw new RuntimeException("Unexpected status " + status);
+ }
+
+ // EXCERCISE in the case of many threads trying to reserve a slot this approach of immediately retrying is inefficient. Exponential backoff is good
+ // general solution to solve contention problems like this. However in this particular case exponential backoff could penalize the earliest threads that
+ // attempted to make a reservation putting them later in the list. A more complex solution could involve having independent sub-queues within the row
+ // that approximately maintain arrival order and use exponential back off to fairly merge the sub-queues into the main queue.
+
+ // its important to use an isolated scanner so that only whole mutations are seen
+ Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY));
+ scanner.setRange(new Range(row));
+
+ int seq = -1;
+ int maxReservation = -1;
+
+ for (Entry<Key,Value> entry : scanner) {
+ String cf = entry.getKey().getColumnFamilyData().toString();
+ String cq = entry.getKey().getColumnQualifierData().toString();
+ String val = entry.getValue().toString();
+
+ if (cf.equals("tx") && cq.equals("seq")) {
+ seq = Integer.parseInt(val);
+ } else if (cf.equals("res")) {
+ // EXCERCISE scanning the entire list to find if reserver is already in the list is inefficient. One possible way to solve this would be to sort the
+ // data differently in Accumulo so that finding the reserver could be done quickly.
+ if (val.equals(who))
+ if (maxReservation == -1)
+ return ReservationResult.RESERVED; // already have the first reservation
+ else
+ return ReservationResult.WAIT_LISTED; // already on wait list
+
+ // EXCERCISE the way this code finds the max reservation is very inefficient.... it would be better if it did not have to scan the entire row.
+ // One possibility is to just use the sequence number. Could also consider sorting the data in another way and/or using an iterator.
+ maxReservation = Integer.parseInt(cq);
+ }
+ }
+
+ Condition condition = new Condition("tx", "seq");
+ if (seq >= 0)
+ condition.setValue(seq + ""); // only expect a seq # if one was seen
+
+ update = new ConditionalMutation(row, condition);
+ update.put("tx", "seq", (seq + 1) + "");
+ update.put("res", String.format("%04d", maxReservation + 1), who);
+
+ // EXCERCISE if set capacity is implemented, then result should take capacity into account
+ if (maxReservation == -1)
+ result = ReservationResult.RESERVED; // if successful, will be first reservation
+ else
+ result = ReservationResult.WAIT_LISTED;
+ }
+ } finally {
+ cwriter.close();
+ }
+
+ }
+
+ public void cancel(String what, String when, String who) throws Exception {
+
+ String row = what + ":" + when;
+
+ // Even though this method is only deleting a column, its important to use a conditional writer. By updating the seq # when deleting a reservation it
+ // will cause any concurrent reservations to retry. If this delete were done using a batch writer, then a concurrent reservation could report WAIT_LISTED
+ // when it actually got the reservation.
+
+ ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
+
+ try {
+ while (true) {
+
+ // its important to use an isolated scanner so that only whole mutations are seen
+ Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY));
+ scanner.setRange(new Range(row));
+
+ int seq = -1;
+ String reservation = null;
+
+ for (Entry<Key,Value> entry : scanner) {
+ String cf = entry.getKey().getColumnFamilyData().toString();
+ String cq = entry.getKey().getColumnQualifierData().toString();
+ String val = entry.getValue().toString();
+
+ // EXCERCISE avoid linear scan
+
+ if (cf.equals("tx") && cq.equals("seq")) {
+ seq = Integer.parseInt(val);
+ } else if (cf.equals("res") && val.equals(who)) {
+ reservation = cq;
+ }
+ }
+
+ if (reservation != null) {
+ ConditionalMutation update = new ConditionalMutation(row, new Condition("tx", "seq").setValue(seq + ""));
+ update.putDelete("res", reservation);
+ update.put("tx", "seq", (seq + 1) + "");
+
+ Status status = cwriter.write(update).getStatus();
+ switch (status) {
+ case ACCEPTED:
+ // successfully canceled reservation
+ return;
+ case REJECTED:
+ case UNKNOWN:
+ // retry
+ // EXCERCISE exponential backoff could be used here
+ break;
+ default:
+ throw new RuntimeException("Unexpected status " + status);
+ }
+
+ } else {
+ // not reserved, nothing to do
+ break;
+ }
+
+ }
+ } finally {
+ cwriter.close();
+ }
+ }
+
+ public List<String> list(String what, String when) throws Exception {
+ String row = what + ":" + when;
+
+ //its important to use an isolated scanner so that only whole mutations are seen
+ Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY));
+ scanner.setRange(new Range(row));
+ scanner.fetchColumnFamily(new Text("res"));
+
+ List<String> reservations = new ArrayList<String>();
+
+ for (Entry<Key,Value> entry : scanner) {
+ String val = entry.getValue().toString();
+ reservations.add(val);
+ }
+
+ return reservations;
+ }
+
+ public static void main(String[] args) throws Exception {
+ final ConsoleReader reader = new ConsoleReader();
+ ARS ars = null;
+
+ while (true) {
+ String line = reader.readLine(">");
+ if (line == null)
+ break;
+
+ final String[] tokens = line.split("\\s+");
+
+ if (tokens[0].equals("reserve") && tokens.length >= 4 && ars != null) {
+ // start up multiple threads all trying to reserve the same resource, no more than one should succeed
+
+ final ARS fars = ars;
+ ArrayList<Thread> threads = new ArrayList<Thread>();
+ for (int i = 3; i < tokens.length; i++) {
+ final int whoIndex = i;
+ Runnable reservationTask = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ reader.println(" " + String.format("%20s", tokens[whoIndex]) + " : " + fars.reserve(tokens[1], tokens[2], tokens[whoIndex]));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ threads.add(new Thread(reservationTask));
+ }
+
+ for (Thread thread : threads)
+ thread.start();
+
+ for (Thread thread : threads)
+ thread.join();
+
+ } else if (tokens[0].equals("cancel") && tokens.length == 4 && ars != null) {
+ ars.cancel(tokens[1], tokens[2], tokens[3]);
+ } else if (tokens[0].equals("list") && tokens.length == 3 && ars != null) {
+ List<String> reservations = ars.list(tokens[1], tokens[2]);
+ if (reservations.size() > 0) {
+ reader.println(" Reservation holder : " + reservations.get(0));
+ if (reservations.size() > 1)
+ reader.println(" Wait list : " + reservations.subList(1, reservations.size()));
+ }
+ } else if (tokens[0].equals("quit") && tokens.length == 1) {
+ break;
+ } else if (tokens[0].equals("connect") && tokens.length == 6 && ars == null) {
+ ZooKeeperInstance zki = new ZooKeeperInstance(tokens[1], tokens[2]);
+ Connector conn = zki.getConnector(tokens[3], new PasswordToken(tokens[4]));
+ if (conn.tableOperations().exists(tokens[5])) {
+ ars = new ARS(conn, tokens[5]);
+ reader.println(" connected");
+ } else
+ reader.println(" No Such Table");
+ } else {
+ System.out.println(" Commands : ");
+ if (ars == null) {
+ reader.println(" connect <instance> <zookeepers> <user> <pass> <table>");
+ } else {
+ reader.println(" reserve <what> <when> <who> {who}");
+ reader.println(" cancel <what> <when> <who>");
+ reader.println(" list <what> <when>");
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/server/monitor/src/main/resources/docs/examples/README.reservations
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/resources/docs/examples/README.reservations b/server/monitor/src/main/resources/docs/examples/README.reservations
new file mode 100644
index 0000000..7ad9cd3
--- /dev/null
+++ b/server/monitor/src/main/resources/docs/examples/README.reservations
@@ -0,0 +1,66 @@
+Title: Apache Accumulo Isolation Example
+Notice: Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ .
+ http://www.apache.org/licenses/LICENSE-2.0
+ .
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+This example shows running a simple reservation system implemented using
+conditional mutations. This system garuntees that only one concurrent user can
+reserve a resource. The example's reserve command allows multiple users to be
+specified. When this is done it creates a separate reservation thread for each
+user. In the example below threads are spun up for alice, bob, eve, mallory,
+and trent to reserve room06 on 20140101. Bob ends up getting the reservation
+and everyone else is put on a wait list. The example code will take any string
+for what, when and who.
+
+ $ ./bin/accumulo org.apache.accumulo.examples.simple.reservations.ARS
+ >connect test16 localhost root secret ars
+ connected
+ >
+ Commands :
+ reserve <what> <when> <who> {who}
+ cancel <what> <when> <who>
+ list <what> <when>
+ >reserve room06 20140101 alice bob eve mallory trent
+ bob : RESERVED
+ mallory : WAIT_LISTED
+ alice : WAIT_LISTED
+ trent : WAIT_LISTED
+ eve : WAIT_LISTED
+ >list room06 20140101
+ Reservation holder : bob
+ Wait list : [mallory, alice, trent, eve]
+ >cancel room06 20140101 alice
+ >cancel room06 20140101 bob
+ >list room06 20140101
+ Reservation holder : mallory
+ Wait list : [trent, eve]
+ >quit
+
+Scanning the table in the Accumulo shell after running the example shows the
+following:
+
+ root@test16> table ars
+ root@test16 ars> scan
+ room06:20140101 res:0001 [] mallory
+ room06:20140101 res:0003 [] trent
+ room06:20140101 res:0004 [] eve
+ room06:20140101 tx:seq [] 6
+
+The tx:seq column is incremented for each update to the row allowing for
+detection of concurrent changes. For an update to go through the sequence
+number must not have changed since the data was read. If it does change then
+the conditional mutation will fail and the example code will retry.
+