You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mw...@apache.org on 2016/12/09 17:12:16 UTC
[3/7] accumulo-examples git commit: ACCUMULO-4511 Adding examples
from Accumulo repo
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
new file mode 100644
index 0000000..0c8d1ae
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce.bulk;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Base64;
+import java.util.Collection;
+
+import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Example map reduce job that bulk ingest data into an accumulo table. The expected input is text files containing tab separated key value pairs on each line.
+ */
+public class BulkIngestExample extends Configured implements Tool {
+ public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {
+ private Text outputKey = new Text();
+ private Text outputValue = new Text();
+
+ @Override
+ public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException {
+ // split on tab
+ int index = -1;
+ for (int i = 0; i < value.getLength(); i++) {
+ if (value.getBytes()[i] == '\t') {
+ index = i;
+ break;
+ }
+ }
+
+ if (index > 0) {
+ outputKey.set(value.getBytes(), 0, index);
+ outputValue.set(value.getBytes(), index + 1, value.getLength() - (index + 1));
+ output.write(outputKey, outputValue);
+ }
+ }
+ }
+
+ public static class ReduceClass extends Reducer<Text,Text,Key,Value> {
+ @Override
+ public void reduce(Text key, Iterable<Text> values, Context output) throws IOException, InterruptedException {
+ // be careful with the timestamp... if you run on a cluster
+ // where the time is whacked you may not see your updates in
+ // accumulo if there is already an existing value with a later
+ // timestamp in accumulo... so make sure ntp is running on the
+ // cluster or consider using logical time... one options is
+ // to let accumulo set the time
+ long timestamp = System.currentTimeMillis();
+
+ int index = 0;
+ for (Text value : values) {
+ Key outputKey = new Key(key, new Text("colf"), new Text(String.format("col_%07d", index)), timestamp);
+ index++;
+
+ Value outputValue = new Value(value.getBytes(), 0, value.getLength());
+ output.write(outputKey, outputValue);
+ }
+ }
+ }
+
+ static class Opts extends MapReduceClientOnRequiredTable {
+ @Parameter(names = "--inputDir", required = true)
+ String inputDir;
+ @Parameter(names = "--workDir", required = true)
+ String workDir;
+ }
+
+ @Override
+ public int run(String[] args) {
+ Opts opts = new Opts();
+ opts.parseArgs(BulkIngestExample.class.getName(), args);
+
+ Configuration conf = getConf();
+ PrintStream out = null;
+ try {
+ Job job = Job.getInstance(conf);
+ job.setJobName("bulk ingest example");
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(TextInputFormat.class);
+
+ job.setMapperClass(MapClass.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setReducerClass(ReduceClass.class);
+ job.setOutputFormatClass(AccumuloFileOutputFormat.class);
+ opts.setAccumuloConfigs(job);
+
+ Connector connector = opts.getConnector();
+
+ TextInputFormat.setInputPaths(job, new Path(opts.inputDir));
+ AccumuloFileOutputFormat.setOutputPath(job, new Path(opts.workDir + "/files"));
+
+ FileSystem fs = FileSystem.get(conf);
+ out = new PrintStream(new BufferedOutputStream(fs.create(new Path(opts.workDir + "/splits.txt"))));
+
+ Collection<Text> splits = connector.tableOperations().listSplits(opts.getTableName(), 100);
+ for (Text split : splits)
+ out.println(Base64.getEncoder().encodeToString(TextUtil.getBytes(split)));
+
+ job.setNumReduceTasks(splits.size() + 1);
+ out.close();
+
+ job.setPartitionerClass(RangePartitioner.class);
+ RangePartitioner.setSplitFile(job, opts.workDir + "/splits.txt");
+
+ job.waitForCompletion(true);
+ Path failures = new Path(opts.workDir, "failures");
+ fs.delete(failures, true);
+ fs.mkdirs(new Path(opts.workDir, "failures"));
+ // With HDFS permissions on, we need to make sure the Accumulo user can read/move the rfiles
+ FsShell fsShell = new FsShell(conf);
+ fsShell.run(new String[] {"-chmod", "-R", "777", opts.workDir});
+ connector.tableOperations().importDirectory(opts.getTableName(), opts.workDir + "/files", opts.workDir + "/failures", false);
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (out != null)
+ out.close();
+ }
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new BulkIngestExample(), args);
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java
new file mode 100644
index 0000000..4622ea0
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce.bulk;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.beust.jcommander.Parameter;
+
+public class GenerateTestData {
+
+ static class Opts extends org.apache.accumulo.core.cli.Help {
+ @Parameter(names = "--start-row", required = true)
+ int startRow = 0;
+ @Parameter(names = "--count", required = true)
+ int numRows = 0;
+ @Parameter(names = "--output", required = true)
+ String outputFile;
+ }
+
+ public static void main(String[] args) throws IOException {
+ Opts opts = new Opts();
+ opts.parseArgs(GenerateTestData.class.getName(), args);
+
+ FileSystem fs = FileSystem.get(new Configuration());
+ PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(new Path(opts.outputFile))));
+
+ for (int i = 0; i < opts.numRows; i++) {
+ out.println(String.format("row_%010d\tvalue_%010d", i + opts.startRow, i + opts.startRow));
+ }
+ out.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
new file mode 100644
index 0000000..a469fa6
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce.bulk;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+public class SetupTable {
+
+ static class Opts extends ClientOnRequiredTable {
+ @Parameter(description = "<split> { <split> ... } ")
+ List<String> splits = new ArrayList<>();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(SetupTable.class.getName(), args);
+ Connector conn = opts.getConnector();
+ conn.tableOperations().create(opts.getTableName());
+ if (!opts.splits.isEmpty()) {
+ // create a table with initial partitions
+ TreeSet<Text> intialPartitions = new TreeSet<>();
+ for (String split : opts.splits) {
+ intialPartitions.add(new Text(split));
+ }
+ conn.tableOperations().addSplits(opts.getTableName(), intialPartitions);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
new file mode 100644
index 0000000..fb47eef
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce.bulk;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+public class VerifyIngest {
+ private static final Logger log = LoggerFactory.getLogger(VerifyIngest.class);
+
+ static class Opts extends ClientOnRequiredTable {
+ @Parameter(names = "--start-row")
+ int startRow = 0;
+ @Parameter(names = "--count", required = true, description = "number of rows to verify")
+ int numRows = 0;
+ }
+
+ public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ Opts opts = new Opts();
+ opts.parseArgs(VerifyIngest.class.getName(), args);
+
+ Connector connector = opts.getConnector();
+ Scanner scanner = connector.createScanner(opts.getTableName(), opts.auths);
+
+ scanner.setRange(new Range(new Text(String.format("row_%010d", opts.startRow)), null));
+
+ Iterator<Entry<Key,Value>> si = scanner.iterator();
+
+ boolean ok = true;
+
+ for (int i = opts.startRow; i < opts.numRows; i++) {
+
+ if (si.hasNext()) {
+ Entry<Key,Value> entry = si.next();
+
+ if (!entry.getKey().getRow().toString().equals(String.format("row_%010d", i))) {
+ log.error("unexpected row key " + entry.getKey().getRow().toString() + " expected " + String.format("row_%010d", i));
+ ok = false;
+ }
+
+ if (!entry.getValue().toString().equals(String.format("value_%010d", i))) {
+ log.error("unexpected value " + entry.getValue().toString() + " expected " + String.format("value_%010d", i));
+ ok = false;
+ }
+
+ } else {
+ log.error("no more rows, expected " + String.format("row_%010d", i));
+ ok = false;
+ break;
+ }
+
+ }
+
+ if (ok) {
+ System.out.println("OK");
+ System.exit(0);
+ } else {
+ System.exit(1);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/reservations/ARS.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/reservations/ARS.java b/src/main/java/org/apache/accumulo/examples/reservations/ARS.java
new file mode 100644
index 0000000..fb0277c
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/reservations/ARS.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.reservations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import jline.console.ConsoleReader;
+
+/**
+ * Accumulo Reservation System : An example reservation system using Accumulo. Supports atomic reservations of a resource at a date. Wait list are also
+ * supported. In order to keep the example simple, no checking is done of the date. Also the code is inefficient, if interested in improving it take a look at
+ * the EXCERCISE comments.
+ */
+
+// EXCERCISE create a test that verifies correctness under concurrency. For example, have M threads making reservations against N resources. Each thread could
+// randomly reserve and cancel resources for a single user. When each thread finishes, it knows what the state of its single user should be. When all threads
+// finish, collect their expected state and verify the status of all users and resources. For extra credit run the test on a IAAS provider using 10 nodes and
+// 10 threads per node.
+
+public class ARS {
+
+ private static final Logger log = LoggerFactory.getLogger(ARS.class);
+
+ private Connector conn;
+ private String rTable;
+
+ public enum ReservationResult {
+ RESERVED, WAIT_LISTED
+ }
+
+ public ARS(Connector conn, String rTable) {
+ this.conn = conn;
+ this.rTable = rTable;
+ }
+
+ public List<String> setCapacity(String what, String when, int count) {
+ // EXCERCISE implement this method which atomically sets a capacity and returns anyone who was moved to the wait list if the capacity was decreased
+
+ throw new UnsupportedOperationException();
+ }
+
+ public ReservationResult reserve(String what, String when, String who) throws Exception {
+
+ String row = what + ":" + when;
+
+ // EXCERCISE This code assumes there is no reservation and tries to create one. If a reservation exist then the update will fail. This is a good strategy
+ // when it is expected there are usually no reservations. Could modify the code to scan first.
+
+ // The following mutation requires that the column tx:seq does not exist and will fail if it does.
+ ConditionalMutation update = new ConditionalMutation(row, new Condition("tx", "seq"));
+ update.put("tx", "seq", "0");
+ update.put("res", String.format("%04d", 0), who);
+
+ ReservationResult result = ReservationResult.RESERVED;
+
+ // it is important to use an isolated scanner so that only whole mutations are seen
+ try (ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
+ Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) {
+ while (true) {
+ Status status = cwriter.write(update).getStatus();
+ switch (status) {
+ case ACCEPTED:
+ return result;
+ case REJECTED:
+ case UNKNOWN:
+ // read the row and decide what to do
+ break;
+ default:
+ throw new RuntimeException("Unexpected status " + status);
+ }
+
+ // EXCERCISE in the case of many threads trying to reserve a slot, this approach of immediately retrying is inefficient. Exponential back-off is good
+ // general solution to solve contention problems like this. However in this particular case, exponential back-off could penalize the earliest threads
+ // that attempted to make a reservation by putting them later in the list. A more complex solution could involve having independent sub-queues within
+ // the row that approximately maintain arrival order and use exponential back off to fairly merge the sub-queues into the main queue.
+
+ scanner.setRange(new Range(row));
+
+ int seq = -1;
+ int maxReservation = -1;
+
+ for (Entry<Key,Value> entry : scanner) {
+ String cf = entry.getKey().getColumnFamilyData().toString();
+ String cq = entry.getKey().getColumnQualifierData().toString();
+ String val = entry.getValue().toString();
+
+ if (cf.equals("tx") && cq.equals("seq")) {
+ seq = Integer.parseInt(val);
+ } else if (cf.equals("res")) {
+ // EXCERCISE scanning the entire list to find if reserver is already in the list is inefficient. One possible way to solve this would be to sort the
+ // data differently in Accumulo so that finding the reserver could be done quickly.
+ if (val.equals(who))
+ if (maxReservation == -1)
+ return ReservationResult.RESERVED; // already have the first reservation
+ else
+ return ReservationResult.WAIT_LISTED; // already on wait list
+
+ // EXCERCISE the way this code finds the max reservation is very inefficient.... it would be better if it did not have to scan the entire row.
+ // One possibility is to just use the sequence number. Could also consider sorting the data in another way and/or using an iterator.
+ maxReservation = Integer.parseInt(cq);
+ }
+ }
+
+ Condition condition = new Condition("tx", "seq");
+ if (seq >= 0)
+ condition.setValue(seq + ""); // only expect a seq # if one was seen
+
+ update = new ConditionalMutation(row, condition);
+ update.put("tx", "seq", (seq + 1) + "");
+ update.put("res", String.format("%04d", maxReservation + 1), who);
+
+ // EXCERCISE if set capacity is implemented, then result should take capacity into account
+ if (maxReservation == -1)
+ result = ReservationResult.RESERVED; // if successful, will be first reservation
+ else
+ result = ReservationResult.WAIT_LISTED;
+ }
+ }
+ }
+
+ public void cancel(String what, String when, String who) throws Exception {
+
+ String row = what + ":" + when;
+
+ // Even though this method is only deleting a column, its important to use a conditional writer. By updating the seq # when deleting a reservation, it
+ // will cause any concurrent reservations to retry. If this delete were done using a batch writer, then a concurrent reservation could report WAIT_LISTED
+ // when it actually got the reservation.
+
+ // its important to use an isolated scanner so that only whole mutations are seen
+ try (ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
+ Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) {
+ while (true) {
+ scanner.setRange(new Range(row));
+
+ int seq = -1;
+ String reservation = null;
+
+ for (Entry<Key,Value> entry : scanner) {
+ String cf = entry.getKey().getColumnFamilyData().toString();
+ String cq = entry.getKey().getColumnQualifierData().toString();
+ String val = entry.getValue().toString();
+
+ // EXCERCISE avoid linear scan
+
+ if (cf.equals("tx") && cq.equals("seq")) {
+ seq = Integer.parseInt(val);
+ } else if (cf.equals("res") && val.equals(who)) {
+ reservation = cq;
+ }
+ }
+
+ if (reservation != null) {
+ ConditionalMutation update = new ConditionalMutation(row, new Condition("tx", "seq").setValue(seq + ""));
+ update.putDelete("res", reservation);
+ update.put("tx", "seq", (seq + 1) + "");
+
+ Status status = cwriter.write(update).getStatus();
+ switch (status) {
+ case ACCEPTED:
+ // successfully canceled reservation
+ return;
+ case REJECTED:
+ case UNKNOWN:
+ // retry
+ // EXCERCISE exponential back-off could be used here
+ break;
+ default:
+ throw new RuntimeException("Unexpected status " + status);
+ }
+
+ } else {
+ // not reserved, nothing to do
+ break;
+ }
+
+ }
+ }
+ }
+
+ public List<String> list(String what, String when) throws Exception {
+ String row = what + ":" + when;
+
+ // its important to use an isolated scanner so that only whole mutations are seen
+ try (Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) {
+ scanner.setRange(new Range(row));
+ scanner.fetchColumnFamily(new Text("res"));
+
+ List<String> reservations = new ArrayList<>();
+
+ for (Entry<Key,Value> entry : scanner) {
+ String val = entry.getValue().toString();
+ reservations.add(val);
+ }
+
+ return reservations;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ final ConsoleReader reader = new ConsoleReader();
+ ARS ars = null;
+
+ while (true) {
+ String line = reader.readLine(">");
+ if (line == null)
+ break;
+
+ final String[] tokens = line.split("\\s+");
+
+ if (tokens[0].equals("reserve") && tokens.length >= 4 && ars != null) {
+ // start up multiple threads all trying to reserve the same resource, no more than one should succeed
+
+ final ARS fars = ars;
+ ArrayList<Thread> threads = new ArrayList<>();
+ for (int i = 3; i < tokens.length; i++) {
+ final int whoIndex = i;
+ Runnable reservationTask = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ reader.println(" " + String.format("%20s", tokens[whoIndex]) + " : " + fars.reserve(tokens[1], tokens[2], tokens[whoIndex]));
+ } catch (Exception e) {
+ log.warn("Could not write to the ConsoleReader.", e);
+ }
+ }
+ };
+
+ threads.add(new Thread(reservationTask));
+ }
+
+ for (Thread thread : threads)
+ thread.start();
+
+ for (Thread thread : threads)
+ thread.join();
+
+ } else if (tokens[0].equals("cancel") && tokens.length == 4 && ars != null) {
+ ars.cancel(tokens[1], tokens[2], tokens[3]);
+ } else if (tokens[0].equals("list") && tokens.length == 3 && ars != null) {
+ List<String> reservations = ars.list(tokens[1], tokens[2]);
+ if (reservations.size() > 0) {
+ reader.println(" Reservation holder : " + reservations.get(0));
+ if (reservations.size() > 1)
+ reader.println(" Wait list : " + reservations.subList(1, reservations.size()));
+ }
+ } else if (tokens[0].equals("quit") && tokens.length == 1) {
+ break;
+ } else if (tokens[0].equals("connect") && tokens.length == 6 && ars == null) {
+ ZooKeeperInstance zki = new ZooKeeperInstance(new ClientConfiguration().withInstance(tokens[1]).withZkHosts(tokens[2]));
+ Connector conn = zki.getConnector(tokens[3], new PasswordToken(tokens[4]));
+ if (conn.tableOperations().exists(tokens[5])) {
+ ars = new ARS(conn, tokens[5]);
+ reader.println(" connected");
+ } else
+ reader.println(" No Such Table");
+ } else {
+ System.out.println(" Commands : ");
+ if (ars == null) {
+ reader.println(" connect <instance> <zookeepers> <user> <pass> <table>");
+ } else {
+ reader.println(" reserve <what> <when> <who> {who}");
+ reader.println(" cancel <what> <when> <who>");
+ reader.println(" list <what> <when>");
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/sample/SampleExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/sample/SampleExample.java b/src/main/java/org/apache/accumulo/examples/sample/SampleExample.java
new file mode 100644
index 0000000..608607e
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/sample/SampleExample.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.examples.sample;
+
+import java.util.Collections;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnDefaultTable;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.SampleNotPresentException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
+import org.apache.accumulo.core.client.sample.RowSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.examples.client.RandomBatchWriter;
+import org.apache.accumulo.examples.shard.CutoffIntersectingIterator;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * A simple example of using Accumulo's sampling feature. This example does something similar to what README.sample shows using the shell. Also see
+ * {@link CutoffIntersectingIterator} and README.sample for an example of how to use sample data from within an iterator.
+ */
+public class SampleExample {
+
+ // a compaction strategy that only selects files for compaction that have no sample data or sample data created in a different way than the tables
+ static final CompactionStrategyConfig NO_SAMPLE_STRATEGY = new CompactionStrategyConfig(
+ "org.apache.accumulo.tserver.compaction.strategies.ConfigurableCompactionStrategy").setOptions(Collections.singletonMap("SF_NO_SAMPLE", ""));
+
+ static class Opts extends ClientOnDefaultTable {
+ public Opts() {
+ super("sampex");
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ BatchWriterOpts bwOpts = new BatchWriterOpts();
+ opts.parseArgs(RandomBatchWriter.class.getName(), args, bwOpts);
+
+ Connector conn = opts.getConnector();
+
+ if (!conn.tableOperations().exists(opts.getTableName())) {
+ conn.tableOperations().create(opts.getTableName());
+ } else {
+ System.out.println("Table exists, not doing anything.");
+ return;
+ }
+
+ // write some data
+ BatchWriter bw = conn.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
+ bw.addMutation(createMutation("9225", "abcde", "file://foo.txt"));
+ bw.addMutation(createMutation("8934", "accumulo scales", "file://accumulo_notes.txt"));
+ bw.addMutation(createMutation("2317", "milk, eggs, bread, parmigiano-reggiano", "file://groceries/9/txt"));
+ bw.addMutation(createMutation("3900", "EC2 ate my homework", "file://final_project.txt"));
+ bw.flush();
+
+ SamplerConfiguration sc1 = new SamplerConfiguration(RowSampler.class.getName());
+ sc1.setOptions(ImmutableMap.of("hasher", "murmur3_32", "modulus", "3"));
+
+ conn.tableOperations().setSamplerConfiguration(opts.getTableName(), sc1);
+
+ Scanner scanner = conn.createScanner(opts.getTableName(), Authorizations.EMPTY);
+ System.out.println("Scanning all data :");
+ print(scanner);
+ System.out.println();
+
+ System.out.println("Scanning with sampler configuration. Data was written before sampler was set on table, scan should fail.");
+ scanner.setSamplerConfiguration(sc1);
+ try {
+ print(scanner);
+ } catch (SampleNotPresentException e) {
+ System.out.println(" Saw sample not present exception as expected.");
+ }
+ System.out.println();
+
+ // compact table to recreate sample data
+ conn.tableOperations().compact(opts.getTableName(), new CompactionConfig().setCompactionStrategy(NO_SAMPLE_STRATEGY));
+
+ System.out.println("Scanning after compaction (compaction should have created sample data) : ");
+ print(scanner);
+ System.out.println();
+
+ // update a document in the sample data
+ bw.addMutation(createMutation("2317", "milk, eggs, bread, parmigiano-reggiano, butter", "file://groceries/9/txt"));
+ bw.close();
+ System.out.println("Scanning sample after updating content for docId 2317 (should see content change in sample data) : ");
+ print(scanner);
+ System.out.println();
+
+ // change tables sampling configuration...
+ SamplerConfiguration sc2 = new SamplerConfiguration(RowSampler.class.getName());
+ sc2.setOptions(ImmutableMap.of("hasher", "murmur3_32", "modulus", "2"));
+ conn.tableOperations().setSamplerConfiguration(opts.getTableName(), sc2);
+ // compact table to recreate sample data using new configuration
+ conn.tableOperations().compact(opts.getTableName(), new CompactionConfig().setCompactionStrategy(NO_SAMPLE_STRATEGY));
+
+ System.out.println("Scanning with old sampler configuration. Sample data was created using new configuration with a compaction. Scan should fail.");
+ try {
+ // try scanning with old sampler configuration
+ print(scanner);
+ } catch (SampleNotPresentException e) {
+ System.out.println(" Saw sample not present exception as expected ");
+ }
+ System.out.println();
+
+ // update expected sampler configuration on scanner
+ scanner.setSamplerConfiguration(sc2);
+
+ System.out.println("Scanning with new sampler configuration : ");
+ print(scanner);
+ System.out.println();
+
+ }
+
+ private static void print(Scanner scanner) {
+ for (Entry<Key,Value> entry : scanner) {
+ System.out.println(" " + entry.getKey() + " " + entry.getValue());
+ }
+ }
+
+ private static Mutation createMutation(String docId, String content, String url) {
+ Mutation m = new Mutation(docId);
+ m.put("doc", "context", content);
+ m.put("doc", "url", url);
+ return m;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shard/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shard/ContinuousQuery.java b/src/main/java/org/apache/accumulo/examples/shard/ContinuousQuery.java
new file mode 100644
index 0000000..7251148
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shard/ContinuousQuery.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.shard;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.cli.BatchScannerOpts;
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.IntersectingIterator;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.collect.Iterators;
+
+/**
+ * Using the doc2word table created by Reverse.java, this program randomly selects N words per document. Then it continually queries a random set of words in
+ * the shard table (created by {@link Index}) using the {@link IntersectingIterator}.
+ */
+public class ContinuousQuery {
+
+ static class Opts extends ClientOpts {
+ @Parameter(names = "--shardTable", required = true, description = "name of the shard table")
+ String tableName = null;
+ @Parameter(names = "--doc2Term", required = true, description = "name of the doc2Term table")
+ String doc2Term;
+ @Parameter(names = "--terms", required = true, description = "the number of terms in the query")
+ int numTerms;
+ @Parameter(names = "--count", description = "the number of queries to run")
+ long iterations = Long.MAX_VALUE;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ BatchScannerOpts bsOpts = new BatchScannerOpts();
+ opts.parseArgs(ContinuousQuery.class.getName(), args, bsOpts);
+
+ Connector conn = opts.getConnector();
+
+ ArrayList<Text[]> randTerms = findRandomTerms(conn.createScanner(opts.doc2Term, opts.auths), opts.numTerms);
+
+ Random rand = new Random();
+
+ BatchScanner bs = conn.createBatchScanner(opts.tableName, opts.auths, bsOpts.scanThreads);
+ bs.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS);
+
+ for (long i = 0; i < opts.iterations; i += 1) {
+ Text[] columns = randTerms.get(rand.nextInt(randTerms.size()));
+
+ bs.clearScanIterators();
+ bs.clearColumns();
+
+ IteratorSetting ii = new IteratorSetting(20, "ii", IntersectingIterator.class);
+ IntersectingIterator.setColumnFamilies(ii, columns);
+ bs.addScanIterator(ii);
+ bs.setRanges(Collections.singleton(new Range()));
+
+ long t1 = System.currentTimeMillis();
+ int count = Iterators.size(bs.iterator());
+ long t2 = System.currentTimeMillis();
+
+ System.out.printf(" %s %,d %6.3f%n", Arrays.asList(columns), count, (t2 - t1) / 1000.0);
+ }
+
+ bs.close();
+
+ }
+
+ private static ArrayList<Text[]> findRandomTerms(Scanner scanner, int numTerms) {
+
+ Text currentRow = null;
+
+ ArrayList<Text> words = new ArrayList<>();
+ ArrayList<Text[]> ret = new ArrayList<>();
+
+ Random rand = new Random();
+
+ for (Entry<Key,Value> entry : scanner) {
+ Key key = entry.getKey();
+
+ if (currentRow == null)
+ currentRow = key.getRow();
+
+ if (!currentRow.equals(key.getRow())) {
+ selectRandomWords(words, ret, rand, numTerms);
+ words.clear();
+ currentRow = key.getRow();
+ }
+
+ words.add(key.getColumnFamily());
+
+ }
+
+ selectRandomWords(words, ret, rand, numTerms);
+
+ return ret;
+ }
+
+ private static void selectRandomWords(ArrayList<Text> words, ArrayList<Text[]> ret, Random rand, int numTerms) {
+ if (words.size() >= numTerms) {
+ Collections.shuffle(words, rand);
+ Text docWords[] = new Text[numTerms];
+ for (int i = 0; i < docWords.length; i++) {
+ docWords[i] = words.get(i);
+ }
+
+ ret.add(docWords);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shard/CutoffIntersectingIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shard/CutoffIntersectingIterator.java b/src/main/java/org/apache/accumulo/examples/shard/CutoffIntersectingIterator.java
new file mode 100644
index 0000000..18fe914
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shard/CutoffIntersectingIterator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.examples.shard;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.sample.RowColumnSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.IntersectingIterator;
+
+/**
+ * This iterator uses a sample built from the Column Qualifier to quickly avoid intersecting iterator queries that may return too many documents.
+ */
+
+public class CutoffIntersectingIterator extends IntersectingIterator {
+
+ private IntersectingIterator sampleII;
+ private int sampleMax;
+ private boolean hasTop;
+
+ public static void setCutoff(IteratorSetting iterCfg, int cutoff) {
+ checkArgument(cutoff >= 0);
+ iterCfg.addOption("cutoff", cutoff + "");
+ }
+
+ @Override
+ public boolean hasTop() {
+ return hasTop && super.hasTop();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
+
+ sampleII.seek(range, seekColumnFamilies, inclusive);
+
+ // this check will be redone whenever iterator stack is torn down and recreated.
+ int count = 0;
+ while (count <= sampleMax && sampleII.hasTop()) {
+ sampleII.next();
+ count++;
+ }
+
+ if (count > sampleMax) {
+ // In a real application would probably want to return a key value that indicates too much data. Since this would execute for each tablet, some tablets
+ // may return data. For tablets that did not return data, would want an indication.
+ hasTop = false;
+ } else {
+ hasTop = true;
+ super.seek(range, seekColumnFamilies, inclusive);
+ }
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+
+ IteratorEnvironment sampleEnv = env.cloneWithSamplingEnabled();
+
+ setMax(sampleEnv, options);
+
+ SortedKeyValueIterator<Key,Value> sampleDC = source.deepCopy(sampleEnv);
+ sampleII = new IntersectingIterator();
+ sampleII.init(sampleDC, options, env);
+
+ }
+
+ static void validateSamplerConfig(SamplerConfiguration sampleConfig) {
+ requireNonNull(sampleConfig);
+ checkArgument(sampleConfig.getSamplerClassName().equals(RowColumnSampler.class.getName()), "Unexpected Sampler " + sampleConfig.getSamplerClassName());
+ checkArgument(sampleConfig.getOptions().get("qualifier").equals("true"), "Expected sample on column qualifier");
+ checkArgument(isNullOrFalse(sampleConfig.getOptions(), "row", "family", "visibility"), "Expected sample on column qualifier only");
+ }
+
+ private void setMax(IteratorEnvironment sampleEnv, Map<String,String> options) {
+ String cutoffValue = options.get("cutoff");
+ SamplerConfiguration sampleConfig = sampleEnv.getSamplerConfiguration();
+
+ // Ensure the sample was constructed in an expected way. If the sample is not built as expected, then can not draw conclusions based on sample.
+ requireNonNull(cutoffValue, "Expected cutoff option is missing");
+ validateSamplerConfig(sampleConfig);
+
+ int modulus = Integer.parseInt(sampleConfig.getOptions().get("modulus"));
+
+ sampleMax = Math.round(Float.parseFloat(cutoffValue) / modulus);
+ }
+
+ private static boolean isNullOrFalse(Map<String,String> options, String... keys) {
+ for (String key : keys) {
+ String val = options.get(key);
+ if (val != null && val.equals("true")) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shard/Index.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shard/Index.java b/src/main/java/org/apache/accumulo/examples/shard/Index.java
new file mode 100644
index 0000000..0325e72
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shard/Index.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.shard;
+
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * This program indexes a set of documents given on the command line into a shard table.
+ *
+ * What it writes to the table is row = partition id, column family = term, column qualifier = document id.
+ */
+public class Index {
+
+ static Text genPartition(int partition) {
+ return new Text(String.format("%08x", Math.abs(partition)));
+ }
+
+ public static void index(int numPartitions, Text docId, String doc, String splitRegex, BatchWriter bw) throws Exception {
+
+ String[] tokens = doc.split(splitRegex);
+
+ Text partition = genPartition(doc.hashCode() % numPartitions);
+
+ Mutation m = new Mutation(partition);
+
+ HashSet<String> tokensSeen = new HashSet<>();
+
+ for (String token : tokens) {
+ token = token.toLowerCase();
+
+ if (!tokensSeen.contains(token)) {
+ tokensSeen.add(token);
+ m.put(new Text(token), docId, new Value(new byte[0]));
+ }
+ }
+
+ if (m.size() > 0)
+ bw.addMutation(m);
+ }
+
+ public static void index(int numPartitions, File src, String splitRegex, BatchWriter bw) throws Exception {
+ if (src.isDirectory()) {
+ File[] files = src.listFiles();
+ if (files != null) {
+ for (File child : files) {
+ index(numPartitions, child, splitRegex, bw);
+ }
+ }
+ } else {
+ FileReader fr = new FileReader(src);
+
+ StringBuilder sb = new StringBuilder();
+
+ char data[] = new char[4096];
+ int len;
+ while ((len = fr.read(data)) != -1) {
+ sb.append(data, 0, len);
+ }
+
+ fr.close();
+
+ index(numPartitions, new Text(src.getAbsolutePath()), sb.toString(), splitRegex, bw);
+ }
+
+ }
+
+ static class Opts extends ClientOnRequiredTable {
+ @Parameter(names = "--partitions", required = true, description = "the number of shards to create")
+ int partitions;
+ @Parameter(required = true, description = "<file> { <file> ... }")
+ List<String> files = new ArrayList<>();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ BatchWriterOpts bwOpts = new BatchWriterOpts();
+ opts.parseArgs(Index.class.getName(), args, bwOpts);
+
+ String splitRegex = "\\W+";
+
+ BatchWriter bw = opts.getConnector().createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
+ for (String filename : opts.files) {
+ index(opts.partitions, new File(filename), splitRegex, bw);
+ }
+ bw.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shard/Query.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shard/Query.java b/src/main/java/org/apache/accumulo/examples/shard/Query.java
new file mode 100644
index 0000000..77b459a
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shard/Query.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.shard;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.cli.BatchScannerOpts;
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.IntersectingIterator;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * This program queries a set of terms in the shard table (populated by {@link Index}) using the {@link IntersectingIterator}.
+ */
+public class Query {
+
+ static class Opts extends ClientOnRequiredTable {
+ @Parameter(description = " term { <term> ... }")
+ List<String> terms = new ArrayList<>();
+
+ @Parameter(names = {"--sample"}, description = "Do queries against sample, useful when sample is built using column qualifier")
+ private boolean useSample = false;
+
+ @Parameter(names = {"--sampleCutoff"},
+ description = "Use sample data to determine if a query might return a number of documents over the cutoff. This check is per tablet.")
+ private Integer sampleCutoff = null;
+ }
+
+ public static List<String> query(BatchScanner bs, List<String> terms, Integer cutoff) {
+
+ Text columns[] = new Text[terms.size()];
+ int i = 0;
+ for (String term : terms) {
+ columns[i++] = new Text(term);
+ }
+
+ IteratorSetting ii;
+
+ if (cutoff != null) {
+ ii = new IteratorSetting(20, "ii", CutoffIntersectingIterator.class);
+ CutoffIntersectingIterator.setCutoff(ii, cutoff);
+ } else {
+ ii = new IteratorSetting(20, "ii", IntersectingIterator.class);
+ }
+
+ IntersectingIterator.setColumnFamilies(ii, columns);
+ bs.addScanIterator(ii);
+ bs.setRanges(Collections.singleton(new Range()));
+ List<String> result = new ArrayList<>();
+ for (Entry<Key,Value> entry : bs) {
+ result.add(entry.getKey().getColumnQualifier().toString());
+ }
+ return result;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ BatchScannerOpts bsOpts = new BatchScannerOpts();
+ opts.parseArgs(Query.class.getName(), args, bsOpts);
+ Connector conn = opts.getConnector();
+ BatchScanner bs = conn.createBatchScanner(opts.getTableName(), opts.auths, bsOpts.scanThreads);
+ bs.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS);
+ if (opts.useSample) {
+ SamplerConfiguration samplerConfig = conn.tableOperations().getSamplerConfiguration(opts.getTableName());
+ CutoffIntersectingIterator.validateSamplerConfig(conn.tableOperations().getSamplerConfiguration(opts.getTableName()));
+ bs.setSamplerConfiguration(samplerConfig);
+ }
+ for (String entry : query(bs, opts.terms, opts.sampleCutoff))
+ System.out.println(" " + entry);
+
+ bs.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shard/Reverse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shard/Reverse.java b/src/main/java/org/apache/accumulo/examples/shard/Reverse.java
new file mode 100644
index 0000000..05be206
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shard/Reverse.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.shard;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * The program reads an accumulo table written by {@link Index} and writes out to another table. It writes out a mapping of documents to terms. The document to
+ * term mapping is used by {@link ContinuousQuery}.
+ */
+public class Reverse {
+
+ static class Opts extends ClientOpts {
+ @Parameter(names = "--shardTable")
+ String shardTable = "shard";
+ @Parameter(names = "--doc2Term")
+ String doc2TermTable = "doc2Term";
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ ScannerOpts scanOpts = new ScannerOpts();
+ BatchWriterOpts bwOpts = new BatchWriterOpts();
+ opts.parseArgs(Reverse.class.getName(), args, scanOpts, bwOpts);
+
+ Connector conn = opts.getConnector();
+
+ Scanner scanner = conn.createScanner(opts.shardTable, opts.auths);
+ scanner.setBatchSize(scanOpts.scanBatchSize);
+ BatchWriter bw = conn.createBatchWriter(opts.doc2TermTable, bwOpts.getBatchWriterConfig());
+
+ for (Entry<Key,Value> entry : scanner) {
+ Key key = entry.getKey();
+ Mutation m = new Mutation(key.getColumnQualifier());
+ m.put(key.getColumnFamily(), new Text(), new Value(new byte[0]));
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shell/DebugCommand.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shell/DebugCommand.java b/src/main/java/org/apache/accumulo/examples/shell/DebugCommand.java
new file mode 100644
index 0000000..df68200
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shell/DebugCommand.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.shell;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.shell.Shell;
+import org.apache.accumulo.shell.Shell.Command;
+import org.apache.commons.cli.CommandLine;
+
+public class DebugCommand extends Command {
+
+ @Override
+ public int execute(String fullCommand, CommandLine cl, Shell shellState) throws Exception {
+ Set<String> lines = new TreeSet<>();
+ lines.add("This is a test");
+ shellState.printLines(lines.iterator(), true);
+ return 0;
+ }
+
+ @Override
+ public String description() {
+ return "prints a message to test extension feature";
+ }
+
+ @Override
+ public int numArgs() {
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shell/ExampleShellExtension.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shell/ExampleShellExtension.java b/src/main/java/org/apache/accumulo/examples/shell/ExampleShellExtension.java
new file mode 100644
index 0000000..be94b9c
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shell/ExampleShellExtension.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.shell;
+
+import org.apache.accumulo.shell.Shell.Command;
+import org.apache.accumulo.shell.ShellExtension;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(ShellExtension.class)
+public class ExampleShellExtension extends ShellExtension {
+
+ @Override
+ public String getExtensionName() {
+ return "ExampleShellExtension";
+ }
+
+ @Override
+ public Command[] getCommands() {
+ return new Command[] {new DebugCommand()};
+ }
+
+}