You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by mw...@apache.org on 2016/09/21 16:36:37 UTC
incubator-fluo-recipes git commit: Fixes #102 - Simplify Accumulo
export queue recipe
Repository: incubator-fluo-recipes
Updated Branches:
refs/heads/master d7347ee15 -> 6f5177363
Fixes #102 - Simplify Accumulo export queue recipe
* AccumuloExporter is now abstract class that is implemented by user
and handles writing mutation given by user to Accumulo.
* AccumuloExport objects are no longer placed on queue
* Renamed SharedBatchWriter to AccumuloWriter
* AccumuloExportQueue class contains code for configuring queue
Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/6f517736
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/6f517736
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/6f517736
Branch: refs/heads/master
Commit: 6f5177363ca309c35403fd822e3270559e9dfdbd
Parents: d7347ee
Author: Mike Walch <mw...@apache.org>
Authored: Wed Sep 7 12:04:44 2016 -0400
Committer: Mike Walch <mw...@apache.org>
Committed: Wed Sep 21 12:30:22 2016 -0400
----------------------------------------------------------------------
docs/accumulo-export-queue.md | 89 ++++++
docs/accumulo-export.md | 57 ----
docs/export-queue.md | 2 +-
modules/accumulo/pom.xml | 4 -
.../recipes/accumulo/export/AccumuloExport.java | 37 ---
.../accumulo/export/AccumuloExportQueue.java | 278 +++++++++++++++++++
.../accumulo/export/AccumuloExporter.java | 45 +--
.../accumulo/export/AccumuloReplicator.java | 83 ++++++
.../accumulo/export/DifferenceExport.java | 101 -------
.../accumulo/export/ReplicationExport.java | 82 ------
.../accumulo/export/SharedBatchWriter.java | 149 ----------
.../fluo/recipes/accumulo/export/TableInfo.java | 36 ---
.../accumulo/export/AccumuloExportTest.java | 102 +++++++
.../accumulo/export/DifferenceExportTest.java | 105 -------
.../fluo/recipes/core/export/ExportQueue.java | 4 +
.../recipes/test/export/AccumuloExporterIT.java | 32 +--
.../test/export/AccumuloReplicatorIT.java | 47 ++--
.../recipes/test/export/SimpleExporter.java | 33 +++
.../fluo/recipes/test/export/TestExport.java | 40 ---
19 files changed, 641 insertions(+), 685 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/docs/accumulo-export-queue.md
----------------------------------------------------------------------
diff --git a/docs/accumulo-export-queue.md b/docs/accumulo-export-queue.md
new file mode 100644
index 0000000..dde04fb
--- /dev/null
+++ b/docs/accumulo-export-queue.md
@@ -0,0 +1,89 @@
+# Accumulo Export Queue Specialization
+
+## Background
+
+The [Export Queue Recipe][1] provides a generic foundation for building export mechanism to any
+external data store. The [AccumuloExportQueue] provides an implementation of this recipe for
+Accumulo. The [AccumuloExportQueue] is located the 'fluo-recipes-accumulo' module and provides the
+following functionality:
+
+ * Safely batches writes to Accumulo made by multiple transactions exporting data.
+ * Stores Accumulo connection information in Fluo configuration, making it accessible by Export
+ Observers running on other nodes.
+ * Provides utility code that make it easier and shorter to code common Accumulo export patterns.
+
+## Example Use
+
+Exporting to Accumulo is easy. Follow the steps below:
+
+1. Implement a class that extends [AccumuloExporter]. This class will process exported objects that
+ are placed on your export queue. For example, the `SimpleExporter` class below processes String
+ key/value exports and generates mutations for Accumulo.
+
+ ```java
+ public class SimpleExporter extends AccumuloExporter<String, String> {
+
+ @Override
+ protected Collection<Mutation> processExport(SequencedExport<String, String> export) {
+ Mutation m = new Mutation(export.getKey());
+ m.put("cf", "cq", export.getSequence(), export.getValue());
+ return Collections.singleton(m);
+ }
+ }
+ ```
+
+2. With a `SimpleExporter` created, configure a [AccumuloExportQueue] to use `SimpleExporter` and
+ give it information on how to connect to Accumulo.
+
+ ```java
+
+ FluoConfiguration fluoConfig = ...;
+
+ // Set accumulo configuration
+ String instance = // Name of accumulo instance exporting to
+ String zookeepers = // Zookeepers used by Accumulo instance exporting to
+ String user = // Accumulo username, user that can write to exportTable
+ String password = // Accumulo user password
+ String exportTable = // Name of table to export to
+
+ // Configure accumulo export queue
+ AccumuloExportQueue.configure(fluoConfig, new ExportQueue.Options(EXPORT_QUEUE_ID,
+ SimpleExporter.class.getName(), String.class.getName(), String.class.getName(), numMapBuckets),
+ new AccumuloExportQueue.Options(instance, zookeepers, user, password, exportTable));
+
+ // Initialize Fluo using fluoConfig
+ ```
+
+3. Export queues can be retrieved in Fluo observers and objects can be added to them:
+
+ ```java
+ public class MyObserver extends AbstractObserver {
+
+ ExportQueue<String, String> exportQ;
+
+ @Override
+ public void init(Context context) throws Exception {
+ exportQ = ExportQueue.getInstance(EXPORT_QUEUE_ID, context.getAppConfiguration());
+ }
+
+ @Override
+ public void process(TransactionBase tx, Bytes row, Column col) {
+
+ // Read some data and do some work
+
+ // Add results to export queue
+ String key = // key that identifies export
+ String value = // object to export
+ export.add(tx, key, value);
+ }
+ ```
+
+## Other use cases
+
+[AccumuloReplicator] is a specialized [AccumuloExporter] that replicates a Fluo table to Accumulo.
+
+[1]: export-queue.md
+[AccumuloExportQueue]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java
+[AccumuloExporter]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
+[AccumuloReplicator]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java
+
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/docs/accumulo-export.md
----------------------------------------------------------------------
diff --git a/docs/accumulo-export.md b/docs/accumulo-export.md
deleted file mode 100644
index 4604f2c..0000000
--- a/docs/accumulo-export.md
+++ /dev/null
@@ -1,57 +0,0 @@
-# Accumulo Export Queue Specialization
-
-## Background
-
-The [Export Queue Recipe][1] provides a generic foundation for building export
-mechanism to any external data store. A specific Export Queue implementation
-for Accumulo is provided in the Fluo Recipes Accumulo module.
-
-This implementation provides the following functionality :
-
- * Safely batches writes to Accumulo made by multiple transactions exporting data.
- * Stores Accumulo connection information in Fluo configuration, making it accessible by Export Observers running on other nodes.
- * Provides utility code that make it easier and shorter to code common Accumulo export patterns.
-
-## Example Use
-
-Exporting to Accumulo is easy. Only two things need to be done.
-
- * Configure the export queue.
- * Implement [AccumuloExport][2] with custom code that generates Accumulo mutations.
-
-The following code shows how to configure an Export Queue that will write to an
-external Accumulo table.
-
-```java
-
-FluoConfiguration fluoConfig = ...;
-
-//Configure an export queue to use the classes Fluo Recipes provides for
-//exporting to Accumulo
-ExportQueue.configure(fluoConfig, new ExportQueue.Options(EXPORT_QUEUE_ID,
- AccumuloExporter.class.getName(), String.class.getName(), AccumuloExport.class.getName(),
- numMapBuckets));
-
-String instance = //Name of accumulo instance exporting to
-String zookeepers = //zookeepers used by Accumulo instance exporting to
-String user = //Accumulo username, user that can write to exportTable
-String password = //Accumulo user password
-String exportTable = //Name of table to export to
-
-//Configure the Accumulo table to export to.
-AccumuloExporter.setExportTableInfo(fluoConfig, EXPORT_QUEUE_ID,
- new TableInfo(instance, zookeepers, user, password, exportTable));
-
-//initialize Fluo using fluoConfig
-
-```
-
-After the export queue is initialized as specified above, any Object that
-implements [AccumuloExport][2] can be added to the queue. For the common
-pattern of deleting old data and inserting new data, consider extending
-[DifferenceExport][3].
-
-[1]: export-queue.md
-[2]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java
-[3]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java
-
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/docs/export-queue.md
----------------------------------------------------------------------
diff --git a/docs/export-queue.md b/docs/export-queue.md
index 0120113..c366e9d 100644
--- a/docs/export-queue.md
+++ b/docs/export-queue.md
@@ -281,5 +281,5 @@ example of write skew mentioned in the Percolater paper.
[1]: ../modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java
[2]: https://en.wikipedia.org/wiki/Serializability
-[3]: accumulo-export.md
+[3]: accumulo-export-queue.md
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/modules/accumulo/pom.xml b/modules/accumulo/pom.xml
index 03df1e5..945b9fc 100644
--- a/modules/accumulo/pom.xml
+++ b/modules/accumulo/pom.xml
@@ -25,10 +25,6 @@
<name>Fluo Recipes Accumulo</name>
<dependencies>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java
deleted file mode 100644
index 9b31b7c..0000000
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.accumulo.export;
-
-import java.util.Collection;
-
-import org.apache.accumulo.core.data.Mutation;
-
-/**
- * Implemented by users to export data to Accumulo.
- *
- * @param <K> Export queue key type
- * @since 1.0.0
- */
-public interface AccumuloExport<K> {
-
- /**
- * Creates mutations for export from user's data
- *
- * @param key Export queue key
- * @param seq Export sequence number
- */
- Collection<Mutation> toMutations(K key, long seq);
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java
new file mode 100644
index 0000000..fdba5f3
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.accumulo.export;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.recipes.core.export.ExportQueue;
+
+public class AccumuloExportQueue {
+
+ /**
+ * Configures AccumuloExportQueue
+ *
+ * @param fc Fluo configuration
+ * @param eqo Export queue options
+ * @param ao Accumulo export queue options
+ */
+ public static void configure(FluoConfiguration fc, ExportQueue.Options eqo, Options ao) {
+ ExportQueue.configure(fc, eqo);
+ AccumuloWriter.setConfig(fc.getAppConfiguration(), eqo.getQueueId(), ao);
+ }
+
+ /**
+ * Generates Accumulo mutations by comparing the differences between a RowColumn/Bytes map that is
+ * generated for old and new data and represents how the data should exist in Accumulo. When
+ * comparing each row/column/value (RCV) of old and new data, mutations are generated using the
+ * following rules:
+ * <ul>
+ * <li>If old and new data have the same RCV, nothing is done.
+ * <li>If old and new data have same row/column but different values, an update mutation is
+ * created for the row/column.
+ * <li>If old data has a row/column that is not in the new data, a delete mutation is generated.
+ * <li>If new data has a row/column that is not in the old data, an insert mutation is generated.
+ * <li>Only one mutation is generated per row.
+ * <li>The export sequence number is used for the timestamp in the mutation.
+ * </ul>
+ *
+ * @param oldData Map containing old row/column data
+ * @param newData Map containing new row/column data
+ * @param seq Export sequence number
+ */
+ public static Collection<Mutation> generateMutations(long seq, Map<RowColumn, Bytes> oldData,
+ Map<RowColumn, Bytes> newData) {
+ Map<Bytes, Mutation> mutationMap = new HashMap<>();
+ for (Map.Entry<RowColumn, Bytes> entry : oldData.entrySet()) {
+ RowColumn rc = entry.getKey();
+ if (!newData.containsKey(rc)) {
+ Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray()));
+ m.putDelete(rc.getColumn().getFamily().toArray(), rc.getColumn().getQualifier().toArray(),
+ seq);
+ }
+ }
+ for (Map.Entry<RowColumn, Bytes> entry : newData.entrySet()) {
+ RowColumn rc = entry.getKey();
+ Column col = rc.getColumn();
+ Bytes newVal = entry.getValue();
+ Bytes oldVal = oldData.get(rc);
+ if (oldVal == null || !oldVal.equals(newVal)) {
+ Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray()));
+ m.put(col.getFamily().toArray(), col.getQualifier().toArray(), seq, newVal.toArray());
+ }
+ }
+ return mutationMap.values();
+ }
+
+ /**
+ * Writes mutations to Accumulo using a shared batch writer
+ *
+ * @since 1.0.0
+ */
+ static class AccumuloWriter {
+
+ private static class Mutations {
+ List<Mutation> mutations;
+ CountDownLatch cdl = new CountDownLatch(1);
+
+ Mutations(Collection<Mutation> mutations) {
+ this.mutations = new ArrayList<>(mutations);
+ }
+ }
+
+ /**
+ * Sets AccumuloWriter config in app configuration
+ */
+ static void setConfig(SimpleConfiguration sc, String id, Options ac) {
+ String prefix = "recipes.accumulo.writer." + id;
+ sc.setProperty(prefix + ".instance", ac.instanceName);
+ sc.setProperty(prefix + ".zookeepers", ac.zookeepers);
+ sc.setProperty(prefix + ".user", ac.user);
+ sc.setProperty(prefix + ".password", ac.password);
+ sc.setProperty(prefix + ".table", ac.table);
+ }
+
+ /**
+ * Gets Accumulo Options from app configuration
+ */
+ static Options getConfig(SimpleConfiguration sc, String id) {
+ String prefix = "recipes.accumulo.writer." + id;
+ String instanceName = sc.getString(prefix + ".instance");
+ String zookeepers = sc.getString(prefix + ".zookeepers");
+ String user = sc.getString(prefix + ".user");
+ String password = sc.getString(prefix + ".password");
+ String table = sc.getString(prefix + ".table");
+ return new Options(instanceName, zookeepers, user, password, table);
+ }
+
+ private static class ExportTask implements Runnable {
+
+ private BatchWriter bw;
+
+ ExportTask(String instanceName, String zookeepers, String user, String password, String table)
+ throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ ZooKeeperInstance zki =
+ new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts(
+ zookeepers));
+
+ // TODO need to close batch writer
+ Connector conn = zki.getConnector(user, new PasswordToken(password));
+ try {
+ bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ } catch (TableNotFoundException tnfe) {
+ try {
+ conn.tableOperations().create(table);
+ } catch (TableExistsException e) {
+ // nothing to do
+ }
+
+ bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ }
+ }
+
+ @Override
+ public void run() {
+
+ ArrayList<Mutations> exports = new ArrayList<>();
+
+ while (true) {
+ try {
+ exports.clear();
+
+ // gather export from all threads that have placed an item on the queue
+ exports.add(exportQueue.take());
+ exportQueue.drainTo(exports);
+
+ for (Mutations ml : exports) {
+ bw.addMutations(ml.mutations);
+ }
+
+ bw.flush();
+
+ // notify all threads waiting after flushing
+ for (Mutations ml : exports) {
+ ml.cdl.countDown();
+ }
+
+ } catch (InterruptedException | MutationsRejectedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ }
+
+ private static LinkedBlockingQueue<Mutations> exportQueue = null;
+
+ private AccumuloWriter(String instanceName, String zookeepers, String user, String password,
+ String table) {
+
+ // TODO: fix this write to static and remove findbugs max rank override in pom.xml
+ exportQueue = new LinkedBlockingQueue<>(10000);
+
+ try {
+ Thread queueProcessingTask =
+ new Thread(new ExportTask(instanceName, zookeepers, user, password, table));
+ queueProcessingTask.setDaemon(true);
+ queueProcessingTask.start();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private static Map<String, AccumuloWriter> exporters = new HashMap<>();
+
+
+ static AccumuloWriter getInstance(SimpleConfiguration sc, String id) {
+ return getInstance(getConfig(sc, id));
+ }
+
+ static AccumuloWriter getInstance(Options ac) {
+ return getInstance(ac.instanceName, ac.zookeepers, ac.user, ac.password, ac.table);
+ }
+
+ static synchronized AccumuloWriter getInstance(String instanceName, String zookeepers,
+ String user, String password, String table) {
+
+ String key =
+ instanceName + ":" + zookeepers + ":" + user + ":" + password.hashCode() + ":" + table;
+
+ AccumuloWriter ret = exporters.get(key);
+
+ if (ret == null) {
+ ret = new AccumuloWriter(instanceName, zookeepers, user, password, table);
+ exporters.put(key, ret);
+ }
+
+ return ret;
+ }
+
+ void write(Collection<Mutation> mutations) {
+ Mutations work = new Mutations(mutations);
+ exportQueue.add(work);
+ try {
+ work.cdl.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ /**
+ * Accumulo export queue options
+ *
+ * @since 1.0.0
+ */
+ public static class Options {
+ String instanceName;
+ String zookeepers;
+ String user;
+ String password;
+ String table;
+
+ public Options(String instanceName, String zookeepers, String user, String password,
+ String table) {
+ this.instanceName = instanceName;
+ this.zookeepers = zookeepers;
+ this.user = user;
+ this.password = password;
+ this.table = table;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
index 7e48d12..54fc8ca 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
@@ -16,60 +16,45 @@
package org.apache.fluo.recipes.accumulo.export;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import org.apache.accumulo.core.data.Mutation;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.observer.Observer.Context;
import org.apache.fluo.recipes.core.export.Exporter;
import org.apache.fluo.recipes.core.export.SequencedExport;
/**
- * An {@link Exporter} that takes {@link AccumuloExport} objects and writes mutations to Accumulo
+ * An Accumulo-specific {@link Exporter} that writes mutations to Accumulo using a
+ * {@link AccumuloExportQueue.AccumuloWriter}
*
- * @param <K> Export queue key type
* @since 1.0.0
*/
-public class AccumuloExporter<K> extends Exporter<K, AccumuloExport<K>> {
+public abstract class AccumuloExporter<K, V> extends Exporter<K, V> {
- private SharedBatchWriter sbw;
+ private AccumuloExportQueue.AccumuloWriter accumuloWriter;
@Override
public void init(String queueId, Context context) throws Exception {
-
- SimpleConfiguration appConf = context.getAppConfiguration();
-
- String instanceName = appConf.getString("recipes.accumuloExporter." + queueId + ".instance");
- String zookeepers = appConf.getString("recipes.accumuloExporter." + queueId + ".zookeepers");
- String user = appConf.getString("recipes.accumuloExporter." + queueId + ".user");
- // TODO look into using delegation token
- String password = appConf.getString("recipes.accumuloExporter." + queueId + ".password");
- String table = appConf.getString("recipes.accumuloExporter." + queueId + ".table");
-
- sbw = SharedBatchWriter.getInstance(instanceName, zookeepers, user, password, table);
- }
-
- public static void setExportTableInfo(FluoConfiguration fconf, String queueId, TableInfo ti) {
- SimpleConfiguration appConf = fconf.getAppConfiguration();
- appConf.setProperty("recipes.accumuloExporter." + queueId + ".instance", ti.instanceName);
- appConf.setProperty("recipes.accumuloExporter." + queueId + ".zookeepers", ti.zookeepers);
- appConf.setProperty("recipes.accumuloExporter." + queueId + ".user", ti.user);
- appConf.setProperty("recipes.accumuloExporter." + queueId + ".password", ti.password);
- appConf.setProperty("recipes.accumuloExporter." + queueId + ".table", ti.table);
+ accumuloWriter =
+ AccumuloExportQueue.AccumuloWriter.getInstance(context.getAppConfiguration(), queueId);
}
@Override
- protected void processExports(Iterator<SequencedExport<K, AccumuloExport<K>>> exports) {
+ protected void processExports(Iterator<SequencedExport<K, V>> exports) {
+
ArrayList<Mutation> buffer = new ArrayList<>();
while (exports.hasNext()) {
- SequencedExport<K, AccumuloExport<K>> export = exports.next();
- buffer.addAll(export.getValue().toMutations(export.getKey(), export.getSequence()));
+ SequencedExport<K, V> export = exports.next();
+ buffer.addAll(processExport(export));
}
if (buffer.size() > 0) {
- sbw.write(buffer);
+ accumuloWriter.write(buffer);
}
}
+
+ protected abstract Collection<Mutation> processExport(SequencedExport<K, V> export);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
new file mode 100644
index 0000000..aaea742
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.accumulo.export;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.export.SequencedExport;
+import org.apache.fluo.recipes.core.transaction.LogEntry;
+import org.apache.fluo.recipes.core.transaction.TxLog;
+
+/**
+ * An {@link AccumuloExporter} that replicates data to Accumulo using a {@link TxLog}
+ */
+public class AccumuloReplicator extends AccumuloExporter<String, TxLog> {
+
+ @Override
+ protected Collection<Mutation> processExport(SequencedExport<String, TxLog> export) {
+ return generateMutations(export.getSequence(), export.getValue());
+ }
+
+ /**
+ * Returns LogEntry filter for Accumulo replication
+ */
+ public static Predicate<LogEntry> getFilter() {
+ return le -> le.getOp().equals(LogEntry.Operation.DELETE)
+ || le.getOp().equals(LogEntry.Operation.SET);
+ }
+
+ /**
+ * Generates Accumulo mutations from a Transaction log. Used to Replicate Fluo table to Accumulo.
+ *
+ * @param txLog Transaction log
+ * @param seq Export sequence number
+ * @return Collection of mutations
+ */
+ public static Collection<Mutation> generateMutations(long seq, TxLog txLog) {
+ Map<Bytes, Mutation> mutationMap = new HashMap<>();
+ for (LogEntry le : txLog.getLogEntries()) {
+ LogEntry.Operation op = le.getOp();
+ Column col = le.getColumn();
+ byte[] cf = col.getFamily().toArray();
+ byte[] cq = col.getQualifier().toArray();
+ byte[] cv = col.getVisibility().toArray();
+ if (op.equals(LogEntry.Operation.DELETE) || op.equals(LogEntry.Operation.SET)) {
+ Mutation m = mutationMap.computeIfAbsent(le.getRow(), k -> new Mutation(k.toArray()));
+ if (op.equals(LogEntry.Operation.DELETE)) {
+ if (col.isVisibilitySet()) {
+ m.putDelete(cf, cq, new ColumnVisibility(cv), seq);
+ } else {
+ m.putDelete(cf, cq, seq);
+ }
+ } else {
+ if (col.isVisibilitySet()) {
+ m.put(cf, cq, new ColumnVisibility(cv), seq, le.getValue().toArray());
+ } else {
+ m.put(cf, cq, seq, le.getValue().toArray());
+ }
+ }
+ }
+ }
+ return mutationMap.values();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java
deleted file mode 100644
index d6fefa7..0000000
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.accumulo.export;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-
-import com.google.common.base.Preconditions;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumn;
-
-/**
- * Implemented by users to export data to Accumulo by comparing the differences between a
- * RowColumn/Bytes map that is generated for old and new data and represents how the data should
- * exist in Accumulo. When comparing each row/column/value (RCV) of old and new data, mutations are
- * generated using the following rules:
- * <ul>
- * <li>If old and new data have the same RCV, nothing is done.
- * <li>If old and new data have same row/column but different values, an update mutation is created
- * for the row/column.
- * <li>If old data has a row/column that is not in the new data, a delete mutation is generated.
- * <li>If new data has a row/column that is not in the old data, an insert mutation is generated.
- * <li>Only one mutation is generated per row.
- * <li>The export sequence number is used for the timestamp in the mutation.
- * </ul>
- *
- * @param <K> Export queue key type
- * @param <V> Type of export value object used to generate data
- * @since 1.0.0
- */
-public abstract class DifferenceExport<K, V> implements AccumuloExport<K> {
-
- private Optional<V> oldVal;
- private Optional<V> newVal;
-
- public DifferenceExport() {}
-
- public DifferenceExport(Optional<V> oldVal, Optional<V> newVal) {
- Objects.requireNonNull(oldVal);
- Objects.requireNonNull(newVal);
- Preconditions.checkArgument(oldVal.isPresent() || newVal.isPresent(),
- "At least one value must be set");
- this.oldVal = oldVal;
- this.newVal = newVal;
- }
-
- /**
- * Generates RowColumn/Bytes map of how data should exist in Accumulo. This map is generated for
- * old and new data and compared to create export mutations that will be written to Accumulo.
- *
- * @param key Export queue key
- * @param val Export value object
- * @return RowColumn/Bytes map of how data should exist in Accumulo
- */
- protected abstract Map<RowColumn, Bytes> generateData(K key, Optional<V> val);
-
- @Override
- public Collection<Mutation> toMutations(K key, long seq) {
- Map<RowColumn, Bytes> oldData = generateData(key, oldVal);
- Map<RowColumn, Bytes> newData = generateData(key, newVal);
-
- Map<Bytes, Mutation> mutationMap = new HashMap<>();
- for (Map.Entry<RowColumn, Bytes> entry : oldData.entrySet()) {
- RowColumn rc = entry.getKey();
- if (!newData.containsKey(rc)) {
- Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray()));
- m.putDelete(rc.getColumn().getFamily().toArray(), rc.getColumn().getQualifier().toArray(),
- seq);
- }
- }
- for (Map.Entry<RowColumn, Bytes> entry : newData.entrySet()) {
- RowColumn rc = entry.getKey();
- Column col = rc.getColumn();
- Bytes newVal = entry.getValue();
- Bytes oldVal = oldData.get(rc);
- if (oldVal == null || !oldVal.equals(newVal)) {
- Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray()));
- m.put(col.getFamily().toArray(), col.getQualifier().toArray(), seq, newVal.toArray());
- }
- }
- return mutationMap.values();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java
deleted file mode 100644
index 9276c6d..0000000
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.accumulo.export;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.function.Predicate;
-
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.recipes.core.transaction.LogEntry;
-import org.apache.fluo.recipes.core.transaction.TxLog;
-
-/**
- * An implementation of {@link AccumuloExport} that replicates a Fluo table to Accumulo using a
- * TxLog
- *
- * @param <K> Type of export queue key
- * @since 1.0.0
- */
-public class ReplicationExport<K> implements AccumuloExport<K> {
-
- private TxLog txLog;
-
- public ReplicationExport() {}
-
- public ReplicationExport(TxLog txLog) {
- Objects.requireNonNull(txLog);
- this.txLog = txLog;
- }
-
- public static Predicate<LogEntry> getFilter() {
- return le -> le.getOp().equals(LogEntry.Operation.DELETE)
- || le.getOp().equals(LogEntry.Operation.SET);
- }
-
- @Override
- public Collection<Mutation> toMutations(K key, long seq) {
- Map<Bytes, Mutation> mutationMap = new HashMap<>();
- for (LogEntry le : txLog.getLogEntries()) {
- LogEntry.Operation op = le.getOp();
- Column col = le.getColumn();
- byte[] cf = col.getFamily().toArray();
- byte[] cq = col.getQualifier().toArray();
- byte[] cv = col.getVisibility().toArray();
- if (op.equals(LogEntry.Operation.DELETE) || op.equals(LogEntry.Operation.SET)) {
- Mutation m = mutationMap.computeIfAbsent(le.getRow(), k -> new Mutation(k.toArray()));
- if (op.equals(LogEntry.Operation.DELETE)) {
- if (col.isVisibilitySet()) {
- m.putDelete(cf, cq, new ColumnVisibility(cv), seq);
- } else {
- m.putDelete(cf, cq, seq);
- }
- } else {
- if (col.isVisibilitySet()) {
- m.put(cf, cq, new ColumnVisibility(cv), seq, le.getValue().toArray());
- } else {
- m.put(cf, cq, seq, le.getValue().toArray());
- }
- }
- }
- }
- return mutationMap.values();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/SharedBatchWriter.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/SharedBatchWriter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/SharedBatchWriter.java
deleted file mode 100644
index 9d189e6..0000000
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/SharedBatchWriter.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.accumulo.export;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Mutation;
-
-class SharedBatchWriter {
-
- private static class Mutations {
-
- List<Mutation> mutations;
- CountDownLatch cdl = new CountDownLatch(1);
-
- Mutations(Collection<Mutation> mutations) {
- this.mutations = new ArrayList<>(mutations);
- }
- }
-
- private static class ExportTask implements Runnable {
-
- private BatchWriter bw;
-
- ExportTask(String instanceName, String zookeepers, String user, String password, String table)
- throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- ZooKeeperInstance zki =
- new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts(
- zookeepers));
-
- // TODO need to close batch writer
- Connector conn = zki.getConnector(user, new PasswordToken(password));
- try {
- bw = conn.createBatchWriter(table, new BatchWriterConfig());
- } catch (TableNotFoundException tnfe) {
- try {
- conn.tableOperations().create(table);
- } catch (TableExistsException e) {
- // nothing to do
- }
-
- bw = conn.createBatchWriter(table, new BatchWriterConfig());
- }
- }
-
- @Override
- public void run() {
-
- ArrayList<Mutations> exports = new ArrayList<>();
-
- while (true) {
- try {
- exports.clear();
-
- // gather export from all threads that have placed an item on the queue
- exports.add(exportQueue.take());
- exportQueue.drainTo(exports);
-
- for (Mutations ml : exports) {
- bw.addMutations(ml.mutations);
- }
-
- bw.flush();
-
- // notify all threads waiting after flushing
- for (Mutations ml : exports) {
- ml.cdl.countDown();
- }
-
- } catch (InterruptedException | MutationsRejectedException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- }
-
- private static LinkedBlockingQueue<Mutations> exportQueue = null;
-
- private SharedBatchWriter(String instanceName, String zookeepers, String user, String password,
- String table) throws Exception {
-
- // TODO: fix this write to static and remove findbugs max rank override in pom.xml
- exportQueue = new LinkedBlockingQueue<>(10000);
- Thread queueProcessingTask =
- new Thread(new ExportTask(instanceName, zookeepers, user, password, table));
-
- queueProcessingTask.setDaemon(true);
- queueProcessingTask.start();
- }
-
- private static Map<String, SharedBatchWriter> exporters = new HashMap<>();
-
- static synchronized SharedBatchWriter getInstance(String instanceName, String zookeepers,
- String user, String password, String table) throws Exception {
-
- String key =
- instanceName + ":" + zookeepers + ":" + user + ":" + password.hashCode() + ":" + table;
-
- SharedBatchWriter ret = exporters.get(key);
-
- if (ret == null) {
- ret = new SharedBatchWriter(instanceName, zookeepers, user, password, table);
- exporters.put(key, ret);
- }
-
- return ret;
- }
-
- void write(Collection<Mutation> mutations) {
- Mutations work = new Mutations(mutations);
- exportQueue.add(work);
- try {
- work.cdl.await();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/TableInfo.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/TableInfo.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/TableInfo.java
deleted file mode 100644
index 9da6773..0000000
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/TableInfo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.accumulo.export;
-
-/**
- * @since 1.0.0
- */
-public class TableInfo {
- String instanceName;
- String zookeepers;
- String user;
- String password;
- String table;
-
- public TableInfo(String instanceName, String zookeepers, String user, String password,
- String table) {
- this.instanceName = instanceName;
- this.zookeepers = zookeepers;
- this.user = user;
- this.password = password;
- this.table = table;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
new file mode 100644
index 0000000..e4be08a
--- /dev/null
+++ b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.accumulo.export;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AccumuloExportTest {
+
+ public static Map<RowColumn, Bytes> genData(String key, Optional<String> val) {
+ if (!val.isPresent()) {
+ return Collections.emptyMap();
+ }
+ Map<RowColumn, Bytes> rcMap = new HashMap<>();
+ String data = val.get();
+ for (int i = 0; i < data.length(); i++) {
+ char c = data.charAt(i);
+ rcMap.put(new RowColumn("r:" + key, new Column("cf:" + c)), Bytes.of("v:" + c));
+ }
+ return rcMap;
+ }
+
+ public static Collection<Mutation> genMutations(String key, long seq, Optional<String> oldVal,
+ Optional<String> newVal) {
+ return AccumuloExportQueue.generateMutations(seq, genData(key, oldVal), genData(key, newVal));
+ }
+
+ public static Mutation makePut(String key, String val, long seq) {
+ Mutation m = new Mutation("r:" + key);
+ addPut(m, key, val, seq);
+ return m;
+ }
+
+ public static void addPut(Mutation m, String key, String val, long seq) {
+ m.put("cf:" + val, "", seq, "v:" + val);
+ }
+
+ public static Mutation makeDel(String key, String val, long seq) {
+ Mutation m = new Mutation("r:" + key);
+ addDel(m, key, val, seq);
+ return m;
+ }
+
+ public static void addDel(Mutation m, String key, String val, long seq) {
+ m.putDelete("cf:" + val, "", seq);
+ }
+
+ @Test
+ public void testDifferenceExport() {
+ Collection<Mutation> mutations;
+
+ mutations = genMutations("k1", 1, Optional.empty(), Optional.of("a"));
+ Assert.assertEquals(1, mutations.size());
+ Assert.assertTrue(mutations.contains(makePut("k1", "a", 1)));
+
+ mutations = genMutations("k2", 2, Optional.of("ab"), Optional.of("ab"));
+ Assert.assertEquals(0, mutations.size());
+
+ mutations = genMutations("k2", 2, Optional.of("b"), Optional.of("ab"));
+ Assert.assertEquals(1, mutations.size());
+ Assert.assertTrue(mutations.contains(makePut("k2", "a", 2)));
+
+ mutations = genMutations("k3", 3, Optional.of("c"), Optional.of("d"));
+ Assert.assertEquals(1, mutations.size());
+ Mutation m = makeDel("k3", "c", 3);
+ addPut(m, "k3", "d", 3);
+ Assert.assertTrue(mutations.contains(m));
+
+ mutations = genMutations("k4", 4, Optional.of("e"), Optional.empty());
+ Assert.assertEquals(1, mutations.size());
+ Assert.assertTrue(mutations.contains(makeDel("k4", "e", 4)));
+
+ mutations = genMutations("k5", 5, Optional.of("ef"), Optional.of("fg"));
+ Assert.assertEquals(1, mutations.size());
+ m = makeDel("k5", "e", 5);
+ addPut(m, "k5", "g", 5);
+ Assert.assertTrue(mutations.contains(m));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/DifferenceExportTest.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/DifferenceExportTest.java b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/DifferenceExportTest.java
deleted file mode 100644
index a6e1188..0000000
--- a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/DifferenceExportTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.accumulo.export;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumn;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class DifferenceExportTest {
-
- public class DiffExport extends DifferenceExport<String, String> {
-
- DiffExport(Optional<String> oldV, Optional<String> newV) {
- super(oldV, newV);
- }
-
- @Override
- protected Map<RowColumn, Bytes> generateData(String key, Optional<String> val) {
- if (!val.isPresent()) {
- return Collections.emptyMap();
- }
- Map<RowColumn, Bytes> rcMap = new HashMap<>();
- String data = val.get();
- for (int i = 0; i < data.length(); i++) {
- char c = data.charAt(i);
- rcMap.put(new RowColumn("r:" + key, new Column("cf:" + c)), Bytes.of("v:" + c));
- }
- return rcMap;
- }
- }
-
- public static Mutation makePut(String key, String val, long seq) {
- Mutation m = new Mutation("r:" + key);
- addPut(m, key, val, seq);
- return m;
- }
-
- public static void addPut(Mutation m, String key, String val, long seq) {
- m.put("cf:" + val, "", seq, "v:" + val);
- }
-
- public static Mutation makeDel(String key, String val, long seq) {
- Mutation m = new Mutation("r:" + key);
- addDel(m, key, val, seq);
- return m;
- }
-
- public static void addDel(Mutation m, String key, String val, long seq) {
- m.putDelete("cf:" + val, "", seq);
- }
-
- @Test
- public void testDifferenceExport() {
- Collection<Mutation> mutations;
-
- mutations = new DiffExport(Optional.empty(), Optional.of("a")).toMutations("k1", 1);
- Assert.assertEquals(1, mutations.size());
- Assert.assertTrue(mutations.contains(makePut("k1", "a", 1)));
-
- mutations = new DiffExport(Optional.of("ab"), Optional.of("ab")).toMutations("k2", 2);
- Assert.assertEquals(0, mutations.size());
-
- mutations = new DiffExport(Optional.of("b"), Optional.of("ab")).toMutations("k2", 2);
- Assert.assertEquals(1, mutations.size());
- Assert.assertTrue(mutations.contains(makePut("k2", "a", 2)));
-
- mutations = new DiffExport(Optional.of("c"), Optional.of("d")).toMutations("k3", 3);
- Assert.assertEquals(1, mutations.size());
- Mutation m = makeDel("k3", "c", 3);
- addPut(m, "k3", "d", 3);
- Assert.assertTrue(mutations.contains(m));
-
- mutations = new DiffExport(Optional.of("e"), Optional.empty()).toMutations("k4", 4);
- Assert.assertEquals(1, mutations.size());
- Assert.assertTrue(mutations.contains(makeDel("k4", "e", 4)));
-
- mutations = new DiffExport(Optional.of("ef"), Optional.of("fg")).toMutations("k5", 5);
- Assert.assertEquals(1, mutations.size());
- m = makeDel("k5", "e", 5);
- addPut(m, "k5", "g", 5);
- Assert.assertTrue(mutations.contains(m));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
index 81d277b..fcc3a74 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
@@ -262,6 +262,10 @@ public class ExportQueue<K, V> {
return bucketsPerTablet;
}
+ public String getQueueId() {
+ return queueId;
+ }
+
void save(SimpleConfiguration appConfig) {
appConfig.setProperty(PREFIX + queueId + ".buckets", numBuckets + "");
appConfig.setProperty(PREFIX + queueId + ".exporter", exporterType + "");
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java
----------------------------------------------------------------------
diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java
index 38986df..38985f1 100644
--- a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java
+++ b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java
@@ -29,10 +29,8 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
-import org.apache.fluo.recipes.accumulo.export.TableInfo;
+import org.apache.fluo.recipes.accumulo.export.AccumuloExportQueue;
import org.apache.fluo.recipes.core.export.ExportQueue;
import org.apache.fluo.recipes.test.AccumuloExportITBase;
import org.apache.hadoop.io.Text;
@@ -41,30 +39,28 @@ import org.junit.Test;
public class AccumuloExporterIT extends AccumuloExportITBase {
- private String et;
+ private String exportTable;
public static final String QUEUE_ID = "aeqt";
@Override
public void preFluoInitHook() throws Exception {
- FluoConfiguration fluoConfig = getFluoConfiguration();
- ExportQueue.configure(fluoConfig,
- new ExportQueue.Options(QUEUE_ID, AccumuloExporter.class.getName(), String.class.getName(),
- TestExport.class.getName(), 5).setBucketsPerTablet(1));
-
// create and configure export table
- et = "export" + tableCounter.getAndIncrement();
- getAccumuloConnector().tableOperations().create(et);
+ exportTable = "export" + tableCounter.getAndIncrement();
+ getAccumuloConnector().tableOperations().create(exportTable);
+
MiniAccumuloCluster miniAccumulo = getMiniAccumuloCluster();
- AccumuloExporter.setExportTableInfo(fluoConfig, QUEUE_ID,
- new TableInfo(miniAccumulo.getInstanceName(), miniAccumulo.getZooKeepers(), ACCUMULO_USER,
- ACCUMULO_PASSWORD, et));
+
+ AccumuloExportQueue.configure(getFluoConfiguration(), new ExportQueue.Options(QUEUE_ID,
+ SimpleExporter.class.getName(), String.class.getName(), String.class.getName(), 5)
+ .setBucketsPerTablet(1), new AccumuloExportQueue.Options(miniAccumulo.getInstanceName(),
+ miniAccumulo.getZooKeepers(), ACCUMULO_USER, ACCUMULO_PASSWORD, exportTable));
}
@Test
public void testAccumuloExport() throws Exception {
- ExportQueue<String, TestExport> teq =
+ ExportQueue<String, String> teq =
ExportQueue.getInstance(QUEUE_ID, getFluoConfiguration().getAppConfiguration());
Assert.assertEquals(6, getFluoSplits().size());
@@ -122,9 +118,9 @@ public class AccumuloExporterIT extends AccumuloExportITBase {
}
}
- private void export(ExportQueue<String, TestExport> teq, Transaction tx,
+ private void export(ExportQueue<String, String> teq, Transaction tx,
Map<String, String> expected, String k, String v) {
- teq.add(tx, k, new TestExport(v));
+ teq.add(tx, k, v);
expected.put(k, v);
}
@@ -134,7 +130,7 @@ public class AccumuloExporterIT extends AccumuloExportITBase {
}
private Map<String, String> getExports() throws Exception {
- Scanner scanner = getAccumuloConnector().createScanner(et, Authorizations.EMPTY);
+ Scanner scanner = getAccumuloConnector().createScanner(exportTable, Authorizations.EMPTY);
Map<String, String> ret = new HashMap<>();
for (Entry<Key, Value> entry : scanner) {
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java
----------------------------------------------------------------------
diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java
index 432fa4d..e5b4e1a 100644
--- a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java
+++ b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java
@@ -27,13 +27,11 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.accumulo.export.AccumuloExport;
-import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
-import org.apache.fluo.recipes.accumulo.export.ReplicationExport;
-import org.apache.fluo.recipes.accumulo.export.TableInfo;
+import org.apache.fluo.recipes.accumulo.export.AccumuloExportQueue;
+import org.apache.fluo.recipes.accumulo.export.AccumuloReplicator;
import org.apache.fluo.recipes.core.export.ExportQueue;
+import org.apache.fluo.recipes.core.transaction.TxLog;
import org.apache.fluo.recipes.test.AccumuloExportITBase;
import org.apache.fluo.recipes.core.transaction.RecordingTransaction;
import org.apache.fluo.recipes.core.types.StringEncoder;
@@ -44,32 +42,31 @@ import org.junit.Test;
public class AccumuloReplicatorIT extends AccumuloExportITBase {
- private String et;
- public static final String QUEUE_ID = "aeqt";
+ private String exportTable;
+ public static final String QUEUE_ID = "repq";
private TypeLayer tl = new TypeLayer(new StringEncoder());
@Override
public void preFluoInitHook() throws Exception {
- ExportQueue
- .configure(
- getFluoConfiguration(),
- new ExportQueue.Options(QUEUE_ID, AccumuloExporter.class.getName(), Bytes.class
- .getName(), AccumuloExport.class.getName(), 5));
// create and configure export table
- et = "export" + tableCounter.getAndIncrement();
- getAccumuloConnector().tableOperations().create(et);
+ exportTable = "export" + tableCounter.getAndIncrement();
+ getAccumuloConnector().tableOperations().create(exportTable);
MiniAccumuloCluster miniAccumulo = getMiniAccumuloCluster();
- AccumuloExporter.setExportTableInfo(getFluoConfiguration(), QUEUE_ID, new TableInfo(
- miniAccumulo.getInstanceName(), miniAccumulo.getZooKeepers(), ACCUMULO_USER,
- ACCUMULO_PASSWORD, et));
+
+ AccumuloExportQueue.configure(
+ getFluoConfiguration(),
+ new ExportQueue.Options(QUEUE_ID, AccumuloReplicator.class.getName(), String.class
+ .getName(), TxLog.class.getName(), 5),
+ new AccumuloExportQueue.Options(miniAccumulo.getInstanceName(), miniAccumulo
+ .getZooKeepers(), ACCUMULO_USER, ACCUMULO_PASSWORD, exportTable));
}
@Test
public void testAccumuloReplicator() throws Exception {
- ExportQueue<Bytes, AccumuloExport<?>> eq =
+ ExportQueue<String, TxLog> eq =
ExportQueue.getInstance(QUEUE_ID, getFluoConfiguration().getAppConfiguration());
MiniFluo miniFluo = getMiniFluo();
@@ -78,12 +75,12 @@ public class AccumuloReplicatorIT extends AccumuloExportITBase {
Map<String, String> expected = new HashMap<>();
try (Transaction tx = fc.newTransaction()) {
- RecordingTransaction rtx = RecordingTransaction.wrap(tx, ReplicationExport.getFilter());
+ RecordingTransaction rtx = RecordingTransaction.wrap(tx, AccumuloReplicator.getFilter());
TypedTransaction ttx = tl.wrap(rtx);
write(ttx, expected, "k1", "v1");
write(ttx, expected, "k2", "v2");
write(ttx, expected, "k3", "v3");
- eq.add(tx, Bytes.of("q1"), new ReplicationExport<>(rtx.getTxLog()));
+ eq.add(tx, "q1", rtx.getTxLog());
tx.commit();
}
@@ -91,13 +88,13 @@ public class AccumuloReplicatorIT extends AccumuloExportITBase {
Assert.assertEquals(expected, getExports());
try (Transaction tx = fc.newTransaction()) {
- RecordingTransaction rtx = RecordingTransaction.wrap(tx, ReplicationExport.getFilter());
+ RecordingTransaction rtx = RecordingTransaction.wrap(tx, AccumuloReplicator.getFilter());
TypedTransaction ttx = tl.wrap(rtx);
write(ttx, expected, "k1", "v4");
delete(ttx, expected, "k3");
write(ttx, expected, "k2", "v5");
write(ttx, expected, "k4", "v6");
- eq.add(tx, Bytes.of("q1"), new ReplicationExport<>(rtx.getTxLog()));
+ eq.add(tx, "q1", rtx.getTxLog());
tx.commit();
}
@@ -105,13 +102,13 @@ public class AccumuloReplicatorIT extends AccumuloExportITBase {
Assert.assertEquals(expected, getExports());
try (Transaction tx = fc.newTransaction()) {
- RecordingTransaction rtx = RecordingTransaction.wrap(tx, ReplicationExport.getFilter());
+ RecordingTransaction rtx = RecordingTransaction.wrap(tx, AccumuloReplicator.getFilter());
TypedTransaction ttx = tl.wrap(rtx);
write(ttx, expected, "k2", "v7");
write(ttx, expected, "k3", "v8");
delete(ttx, expected, "k1");
delete(ttx, expected, "k4");
- eq.add(tx, Bytes.of("q1"), new ReplicationExport<>(rtx.getTxLog()));
+ eq.add(tx, "q1", rtx.getTxLog());
tx.commit();
}
@@ -131,7 +128,7 @@ public class AccumuloReplicatorIT extends AccumuloExportITBase {
}
private Map<String, String> getExports() throws Exception {
- Scanner scanner = getAccumuloConnector().createScanner(et, Authorizations.EMPTY);
+ Scanner scanner = getAccumuloConnector().createScanner(exportTable, Authorizations.EMPTY);
Map<String, String> ret = new HashMap<>();
for (Entry<Key, Value> entry : scanner) {
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
----------------------------------------------------------------------
diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
new file mode 100644
index 0000000..5aa5812
--- /dev/null
+++ b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.test.export;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
+import org.apache.fluo.recipes.core.export.SequencedExport;
+
+public class SimpleExporter extends AccumuloExporter<String, String> {
+
+ @Override
+ protected Collection<Mutation> processExport(SequencedExport<String, String> export) {
+ Mutation m = new Mutation(export.getKey());
+ m.put("cf", "cq", export.getSequence(), export.getValue());
+ return Collections.singleton(m);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/test/src/test/java/org/apache/fluo/recipes/test/export/TestExport.java
----------------------------------------------------------------------
diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/TestExport.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/TestExport.java
deleted file mode 100644
index 9132e0b..0000000
--- a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/TestExport.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.test.export;
-
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.fluo.recipes.accumulo.export.AccumuloExport;
-
-public class TestExport implements AccumuloExport<String> {
-
- private String value;
-
- public TestExport() {}
-
- public TestExport(String value) {
- this.value = value;
- }
-
- @Override
- public Collection<Mutation> toMutations(String key, long seq) {
- Mutation m = new Mutation(key);
- m.put("cf", "cq", seq, value);
- return Collections.singletonList(m);
- }
-}