You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by mw...@apache.org on 2016/09/21 16:36:37 UTC

incubator-fluo-recipes git commit: Fixes #102 - Simplify Accumulo export queue recipe

Repository: incubator-fluo-recipes
Updated Branches:
  refs/heads/master d7347ee15 -> 6f5177363


Fixes #102 - Simplify Accumulo export queue recipe

* AccumuloExporter is now abstract class that is implemented by user
  and handles writing mutation given by user to Accumulo.
* AccumuloExport objects are no longer placed on queue
* Renamed SharedBatchWriter to AccumuloWriter
* AccumuloExportQueue class contains code for configuring queue


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/6f517736
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/6f517736
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/6f517736

Branch: refs/heads/master
Commit: 6f5177363ca309c35403fd822e3270559e9dfdbd
Parents: d7347ee
Author: Mike Walch <mw...@apache.org>
Authored: Wed Sep 7 12:04:44 2016 -0400
Committer: Mike Walch <mw...@apache.org>
Committed: Wed Sep 21 12:30:22 2016 -0400

----------------------------------------------------------------------
 docs/accumulo-export-queue.md                   |  89 ++++++
 docs/accumulo-export.md                         |  57 ----
 docs/export-queue.md                            |   2 +-
 modules/accumulo/pom.xml                        |   4 -
 .../recipes/accumulo/export/AccumuloExport.java |  37 ---
 .../accumulo/export/AccumuloExportQueue.java    | 278 +++++++++++++++++++
 .../accumulo/export/AccumuloExporter.java       |  45 +--
 .../accumulo/export/AccumuloReplicator.java     |  83 ++++++
 .../accumulo/export/DifferenceExport.java       | 101 -------
 .../accumulo/export/ReplicationExport.java      |  82 ------
 .../accumulo/export/SharedBatchWriter.java      | 149 ----------
 .../fluo/recipes/accumulo/export/TableInfo.java |  36 ---
 .../accumulo/export/AccumuloExportTest.java     | 102 +++++++
 .../accumulo/export/DifferenceExportTest.java   | 105 -------
 .../fluo/recipes/core/export/ExportQueue.java   |   4 +
 .../recipes/test/export/AccumuloExporterIT.java |  32 +--
 .../test/export/AccumuloReplicatorIT.java       |  47 ++--
 .../recipes/test/export/SimpleExporter.java     |  33 +++
 .../fluo/recipes/test/export/TestExport.java    |  40 ---
 19 files changed, 641 insertions(+), 685 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/docs/accumulo-export-queue.md
----------------------------------------------------------------------
diff --git a/docs/accumulo-export-queue.md b/docs/accumulo-export-queue.md
new file mode 100644
index 0000000..dde04fb
--- /dev/null
+++ b/docs/accumulo-export-queue.md
@@ -0,0 +1,89 @@
+# Accumulo Export Queue Specialization
+
+## Background
+
+The [Export Queue Recipe][1] provides a generic foundation for building export mechanism to any
+external data store. The [AccumuloExportQueue] provides an implementation of this recipe for
+Accumulo. The [AccumuloExportQueue] is located the 'fluo-recipes-accumulo' module and provides the
+following functionality:
+
+ * Safely batches writes to Accumulo made by multiple transactions exporting data.
+ * Stores Accumulo connection information in Fluo configuration, making it accessible by Export
+   Observers running on other nodes.
+ * Provides utility code that make it easier and shorter to code common Accumulo export patterns.
+
+## Example Use
+
+Exporting to Accumulo is easy. Follow the steps below:
+
+1. Implement a class that extends [AccumuloExporter].  This class will process exported objects that
+   are placed on your export queue. For example, the `SimpleExporter` class below processes String
+   key/value exports and generates mutations for Accumulo.
+
+    ```java
+    public class SimpleExporter extends AccumuloExporter<String, String> {
+
+      @Override
+      protected Collection<Mutation> processExport(SequencedExport<String, String> export) {
+        Mutation m = new Mutation(export.getKey());
+        m.put("cf", "cq", export.getSequence(), export.getValue());
+        return Collections.singleton(m);
+      }
+    }
+    ```
+
+2. With a `SimpleExporter` created, configure a [AccumuloExportQueue] to use `SimpleExporter` and
+   give it information on how to connect to Accumulo. 
+
+    ```java
+
+    FluoConfiguration fluoConfig = ...;
+
+    // Set accumulo configuration
+    String instance =       // Name of accumulo instance exporting to
+    String zookeepers =     // Zookeepers used by Accumulo instance exporting to
+    String user =           // Accumulo username, user that can write to exportTable
+    String password =       // Accumulo user password
+    String exportTable =    // Name of table to export to
+
+    // Configure accumulo export queue
+    AccumuloExportQueue.configure(fluoConfig, new ExportQueue.Options(EXPORT_QUEUE_ID,
+        SimpleExporter.class.getName(), String.class.getName(), String.class.getName(), numMapBuckets),
+        new AccumuloExportQueue.Options(instance, zookeepers, user, password, exportTable));
+
+    // Initialize Fluo using fluoConfig
+    ```
+
+3.  Export queues can be retrieved in Fluo observers and objects can be added to them:
+
+    ```java
+    public class MyObserver extends AbstractObserver {
+
+      ExportQueue<String, String> exportQ;
+
+      @Override
+      public void init(Context context) throws Exception {
+        exportQ = ExportQueue.getInstance(EXPORT_QUEUE_ID, context.getAppConfiguration());
+      }
+
+      @Override
+      public void process(TransactionBase tx, Bytes row, Column col) {
+
+        // Read some data and do some work
+
+        // Add results to export queue
+        String key =    // key that identifies export
+        String value =  // object to export
+        export.add(tx, key, value);
+      }
+    ```
+
+## Other use cases
+
+[AccumuloReplicator] is a specialized [AccumuloExporter] that replicates a Fluo table to Accumulo.
+
+[1]: export-queue.md
+[AccumuloExportQueue]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java
+[AccumuloExporter]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
+[AccumuloReplicator]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java
+

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/docs/accumulo-export.md
----------------------------------------------------------------------
diff --git a/docs/accumulo-export.md b/docs/accumulo-export.md
deleted file mode 100644
index 4604f2c..0000000
--- a/docs/accumulo-export.md
+++ /dev/null
@@ -1,57 +0,0 @@
-# Accumulo Export Queue Specialization
-
-## Background
-
-The [Export Queue Recipe][1] provides a generic foundation for building export 
-mechanism to any external data store.  A specific Export Queue implementation
-for Accumulo is provided in the Fluo Recipes Accumulo module.
-
-This implementation provides the following functionality :
-
- * Safely batches writes to Accumulo made by multiple transactions exporting data.
- * Stores Accumulo connection information in Fluo configuration, making it accessible by Export Observers running on other nodes.
- * Provides utility code that make it easier and shorter to code common Accumulo export patterns.
-
-## Example Use
-
-Exporting to Accumulo is easy.  Only two things need to be done.
-
- * Configure the export queue.
- * Implement [AccumuloExport][2] with custom code that generates Accumulo mutations.
-
-The following code shows how to configure an Export Queue that will write to an
-external Accumulo table.
-
-```java
-
-FluoConfiguration fluoConfig = ...;
-
-//Configure an export queue to use the classes Fluo Recipes provides for
-//exporting to Accumulo
-ExportQueue.configure(fluoConfig, new ExportQueue.Options(EXPORT_QUEUE_ID,
-    AccumuloExporter.class.getName(), String.class.getName(), AccumuloExport.class.getName(),
-    numMapBuckets));
-
-String instance = //Name of accumulo instance exporting to
-String zookeepers = //zookeepers used by Accumulo instance exporting to
-String user = //Accumulo username, user that can write to exportTable
-String password = //Accumulo user password
-String exportTable = //Name of table to export to
-
-//Configure the Accumulo table to export to.
-AccumuloExporter.setExportTableInfo(fluoConfig, EXPORT_QUEUE_ID,
-    new TableInfo(instance, zookeepers, user, password, exportTable));
-
-//initialize Fluo using fluoConfig
-
-```
-
-After the export queue is initialized as specified above, any Object that
-implements [AccumuloExport][2] can be added to the queue.  For the common
-pattern of deleting old data and inserting new data, consider extending
-[DifferenceExport][3].
-
-[1]: export-queue.md
-[2]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java
-[3]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java
-

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/docs/export-queue.md
----------------------------------------------------------------------
diff --git a/docs/export-queue.md b/docs/export-queue.md
index 0120113..c366e9d 100644
--- a/docs/export-queue.md
+++ b/docs/export-queue.md
@@ -281,5 +281,5 @@ example of write skew mentioned in the Percolater paper.
 
 [1]: ../modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java
 [2]: https://en.wikipedia.org/wiki/Serializability
-[3]: accumulo-export.md
+[3]: accumulo-export-queue.md
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/modules/accumulo/pom.xml b/modules/accumulo/pom.xml
index 03df1e5..945b9fc 100644
--- a/modules/accumulo/pom.xml
+++ b/modules/accumulo/pom.xml
@@ -25,10 +25,6 @@
   <name>Fluo Recipes Accumulo</name>
   <dependencies>
     <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-    <dependency>
       <groupId>commons-configuration</groupId>
       <artifactId>commons-configuration</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java
deleted file mode 100644
index 9b31b7c..0000000
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.accumulo.export;
-
-import java.util.Collection;
-
-import org.apache.accumulo.core.data.Mutation;
-
-/**
- * Implemented by users to export data to Accumulo.
- * 
- * @param <K> Export queue key type
- * @since 1.0.0
- */
-public interface AccumuloExport<K> {
-
-  /**
-   * Creates mutations for export from user's data
-   * 
-   * @param key Export queue key
-   * @param seq Export sequence number
-   */
-  Collection<Mutation> toMutations(K key, long seq);
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java
new file mode 100644
index 0000000..fdba5f3
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.accumulo.export;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.recipes.core.export.ExportQueue;
+
+public class AccumuloExportQueue {
+
+  /**
+   * Configures AccumuloExportQueue
+   *
+   * @param fc Fluo configuration
+   * @param eqo Export queue options
+   * @param ao Accumulo export queue options
+   */
+  public static void configure(FluoConfiguration fc, ExportQueue.Options eqo, Options ao) {
+    ExportQueue.configure(fc, eqo);
+    AccumuloWriter.setConfig(fc.getAppConfiguration(), eqo.getQueueId(), ao);
+  }
+
+  /**
+   * Generates Accumulo mutations by comparing the differences between a RowColumn/Bytes map that is
+   * generated for old and new data and represents how the data should exist in Accumulo. When
+   * comparing each row/column/value (RCV) of old and new data, mutations are generated using the
+   * following rules:
+   * <ul>
+   * <li>If old and new data have the same RCV, nothing is done.
+   * <li>If old and new data have same row/column but different values, an update mutation is
+   * created for the row/column.
+   * <li>If old data has a row/column that is not in the new data, a delete mutation is generated.
+   * <li>If new data has a row/column that is not in the old data, an insert mutation is generated.
+   * <li>Only one mutation is generated per row.
+   * <li>The export sequence number is used for the timestamp in the mutation.
+   * </ul>
+   *
+   * @param oldData Map containing old row/column data
+   * @param newData Map containing new row/column data
+   * @param seq Export sequence number
+   */
+  public static Collection<Mutation> generateMutations(long seq, Map<RowColumn, Bytes> oldData,
+      Map<RowColumn, Bytes> newData) {
+    Map<Bytes, Mutation> mutationMap = new HashMap<>();
+    for (Map.Entry<RowColumn, Bytes> entry : oldData.entrySet()) {
+      RowColumn rc = entry.getKey();
+      if (!newData.containsKey(rc)) {
+        Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray()));
+        m.putDelete(rc.getColumn().getFamily().toArray(), rc.getColumn().getQualifier().toArray(),
+            seq);
+      }
+    }
+    for (Map.Entry<RowColumn, Bytes> entry : newData.entrySet()) {
+      RowColumn rc = entry.getKey();
+      Column col = rc.getColumn();
+      Bytes newVal = entry.getValue();
+      Bytes oldVal = oldData.get(rc);
+      if (oldVal == null || !oldVal.equals(newVal)) {
+        Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray()));
+        m.put(col.getFamily().toArray(), col.getQualifier().toArray(), seq, newVal.toArray());
+      }
+    }
+    return mutationMap.values();
+  }
+
+  /**
+   * Writes mutations to Accumulo using a shared batch writer
+   *
+   * @since 1.0.0
+   */
+  static class AccumuloWriter {
+
+    private static class Mutations {
+      List<Mutation> mutations;
+      CountDownLatch cdl = new CountDownLatch(1);
+
+      Mutations(Collection<Mutation> mutations) {
+        this.mutations = new ArrayList<>(mutations);
+      }
+    }
+
+    /**
+     * Sets AccumuloWriter config in app configuration
+     */
+    static void setConfig(SimpleConfiguration sc, String id, Options ac) {
+      String prefix = "recipes.accumulo.writer." + id;
+      sc.setProperty(prefix + ".instance", ac.instanceName);
+      sc.setProperty(prefix + ".zookeepers", ac.zookeepers);
+      sc.setProperty(prefix + ".user", ac.user);
+      sc.setProperty(prefix + ".password", ac.password);
+      sc.setProperty(prefix + ".table", ac.table);
+    }
+
+    /**
+     * Gets Accumulo Options from app configuration
+     */
+    static Options getConfig(SimpleConfiguration sc, String id) {
+      String prefix = "recipes.accumulo.writer." + id;
+      String instanceName = sc.getString(prefix + ".instance");
+      String zookeepers = sc.getString(prefix + ".zookeepers");
+      String user = sc.getString(prefix + ".user");
+      String password = sc.getString(prefix + ".password");
+      String table = sc.getString(prefix + ".table");
+      return new Options(instanceName, zookeepers, user, password, table);
+    }
+
+    private static class ExportTask implements Runnable {
+
+      private BatchWriter bw;
+
+      ExportTask(String instanceName, String zookeepers, String user, String password, String table)
+          throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+        ZooKeeperInstance zki =
+            new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts(
+                zookeepers));
+
+        // TODO need to close batch writer
+        Connector conn = zki.getConnector(user, new PasswordToken(password));
+        try {
+          bw = conn.createBatchWriter(table, new BatchWriterConfig());
+        } catch (TableNotFoundException tnfe) {
+          try {
+            conn.tableOperations().create(table);
+          } catch (TableExistsException e) {
+            // nothing to do
+          }
+
+          bw = conn.createBatchWriter(table, new BatchWriterConfig());
+        }
+      }
+
+      @Override
+      public void run() {
+
+        ArrayList<Mutations> exports = new ArrayList<>();
+
+        while (true) {
+          try {
+            exports.clear();
+
+            // gather export from all threads that have placed an item on the queue
+            exports.add(exportQueue.take());
+            exportQueue.drainTo(exports);
+
+            for (Mutations ml : exports) {
+              bw.addMutations(ml.mutations);
+            }
+
+            bw.flush();
+
+            // notify all threads waiting after flushing
+            for (Mutations ml : exports) {
+              ml.cdl.countDown();
+            }
+
+          } catch (InterruptedException | MutationsRejectedException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+
+    }
+
+    private static LinkedBlockingQueue<Mutations> exportQueue = null;
+
+    private AccumuloWriter(String instanceName, String zookeepers, String user, String password,
+        String table) {
+
+      // TODO: fix this write to static and remove findbugs max rank override in pom.xml
+      exportQueue = new LinkedBlockingQueue<>(10000);
+
+      try {
+        Thread queueProcessingTask =
+            new Thread(new ExportTask(instanceName, zookeepers, user, password, table));
+        queueProcessingTask.setDaemon(true);
+        queueProcessingTask.start();
+      } catch (Exception e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
+    private static Map<String, AccumuloWriter> exporters = new HashMap<>();
+
+
+    static AccumuloWriter getInstance(SimpleConfiguration sc, String id) {
+      return getInstance(getConfig(sc, id));
+    }
+
+    static AccumuloWriter getInstance(Options ac) {
+      return getInstance(ac.instanceName, ac.zookeepers, ac.user, ac.password, ac.table);
+    }
+
+    static synchronized AccumuloWriter getInstance(String instanceName, String zookeepers,
+        String user, String password, String table) {
+
+      String key =
+          instanceName + ":" + zookeepers + ":" + user + ":" + password.hashCode() + ":" + table;
+
+      AccumuloWriter ret = exporters.get(key);
+
+      if (ret == null) {
+        ret = new AccumuloWriter(instanceName, zookeepers, user, password, table);
+        exporters.put(key, ret);
+      }
+
+      return ret;
+    }
+
+    void write(Collection<Mutation> mutations) {
+      Mutations work = new Mutations(mutations);
+      exportQueue.add(work);
+      try {
+        work.cdl.await();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+  }
+
+  /**
+   * Accumulo export queue options
+   *
+   * @since 1.0.0
+   */
+  public static class Options {
+    String instanceName;
+    String zookeepers;
+    String user;
+    String password;
+    String table;
+
+    public Options(String instanceName, String zookeepers, String user, String password,
+        String table) {
+      this.instanceName = instanceName;
+      this.zookeepers = zookeepers;
+      this.user = user;
+      this.password = password;
+      this.table = table;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
index 7e48d12..54fc8ca 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
@@ -16,60 +16,45 @@
 package org.apache.fluo.recipes.accumulo.export;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.observer.Observer.Context;
 import org.apache.fluo.recipes.core.export.Exporter;
 import org.apache.fluo.recipes.core.export.SequencedExport;
 
 /**
- * An {@link Exporter} that takes {@link AccumuloExport} objects and writes mutations to Accumulo
+ * An Accumulo-specific {@link Exporter} that writes mutations to Accumulo using a
+ * {@link AccumuloExportQueue.AccumuloWriter}
  *
- * @param <K> Export queue key type
  * @since 1.0.0
  */
-public class AccumuloExporter<K> extends Exporter<K, AccumuloExport<K>> {
+public abstract class AccumuloExporter<K, V> extends Exporter<K, V> {
 
-  private SharedBatchWriter sbw;
+  private AccumuloExportQueue.AccumuloWriter accumuloWriter;
 
   @Override
   public void init(String queueId, Context context) throws Exception {
-
-    SimpleConfiguration appConf = context.getAppConfiguration();
-
-    String instanceName = appConf.getString("recipes.accumuloExporter." + queueId + ".instance");
-    String zookeepers = appConf.getString("recipes.accumuloExporter." + queueId + ".zookeepers");
-    String user = appConf.getString("recipes.accumuloExporter." + queueId + ".user");
-    // TODO look into using delegation token
-    String password = appConf.getString("recipes.accumuloExporter." + queueId + ".password");
-    String table = appConf.getString("recipes.accumuloExporter." + queueId + ".table");
-
-    sbw = SharedBatchWriter.getInstance(instanceName, zookeepers, user, password, table);
-  }
-
-  public static void setExportTableInfo(FluoConfiguration fconf, String queueId, TableInfo ti) {
-    SimpleConfiguration appConf = fconf.getAppConfiguration();
-    appConf.setProperty("recipes.accumuloExporter." + queueId + ".instance", ti.instanceName);
-    appConf.setProperty("recipes.accumuloExporter." + queueId + ".zookeepers", ti.zookeepers);
-    appConf.setProperty("recipes.accumuloExporter." + queueId + ".user", ti.user);
-    appConf.setProperty("recipes.accumuloExporter." + queueId + ".password", ti.password);
-    appConf.setProperty("recipes.accumuloExporter." + queueId + ".table", ti.table);
+    accumuloWriter =
+        AccumuloExportQueue.AccumuloWriter.getInstance(context.getAppConfiguration(), queueId);
   }
 
   @Override
-  protected void processExports(Iterator<SequencedExport<K, AccumuloExport<K>>> exports) {
+  protected void processExports(Iterator<SequencedExport<K, V>> exports) {
+
     ArrayList<Mutation> buffer = new ArrayList<>();
 
     while (exports.hasNext()) {
-      SequencedExport<K, AccumuloExport<K>> export = exports.next();
-      buffer.addAll(export.getValue().toMutations(export.getKey(), export.getSequence()));
+      SequencedExport<K, V> export = exports.next();
+      buffer.addAll(processExport(export));
     }
 
     if (buffer.size() > 0) {
-      sbw.write(buffer);
+      accumuloWriter.write(buffer);
     }
   }
+
+  protected abstract Collection<Mutation> processExport(SequencedExport<K, V> export);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
new file mode 100644
index 0000000..aaea742
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.accumulo.export;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.export.SequencedExport;
+import org.apache.fluo.recipes.core.transaction.LogEntry;
+import org.apache.fluo.recipes.core.transaction.TxLog;
+
+/**
+ * An {@link AccumuloExporter} that replicates data to Accumulo using a {@link TxLog}
+ */
+public class AccumuloReplicator extends AccumuloExporter<String, TxLog> {
+
+  @Override
+  protected Collection<Mutation> processExport(SequencedExport<String, TxLog> export) {
+    return generateMutations(export.getSequence(), export.getValue());
+  }
+
+  /**
+   * Returns LogEntry filter for Accumulo replication
+   */
+  public static Predicate<LogEntry> getFilter() {
+    return le -> le.getOp().equals(LogEntry.Operation.DELETE)
+        || le.getOp().equals(LogEntry.Operation.SET);
+  }
+
+  /**
+   * Generates Accumulo mutations from a Transaction log. Used to Replicate Fluo table to Accumulo.
+   *
+   * @param txLog Transaction log
+   * @param seq Export sequence number
+   * @return Collection of mutations
+   */
+  public static Collection<Mutation> generateMutations(long seq, TxLog txLog) {
+    Map<Bytes, Mutation> mutationMap = new HashMap<>();
+    for (LogEntry le : txLog.getLogEntries()) {
+      LogEntry.Operation op = le.getOp();
+      Column col = le.getColumn();
+      byte[] cf = col.getFamily().toArray();
+      byte[] cq = col.getQualifier().toArray();
+      byte[] cv = col.getVisibility().toArray();
+      if (op.equals(LogEntry.Operation.DELETE) || op.equals(LogEntry.Operation.SET)) {
+        Mutation m = mutationMap.computeIfAbsent(le.getRow(), k -> new Mutation(k.toArray()));
+        if (op.equals(LogEntry.Operation.DELETE)) {
+          if (col.isVisibilitySet()) {
+            m.putDelete(cf, cq, new ColumnVisibility(cv), seq);
+          } else {
+            m.putDelete(cf, cq, seq);
+          }
+        } else {
+          if (col.isVisibilitySet()) {
+            m.put(cf, cq, new ColumnVisibility(cv), seq, le.getValue().toArray());
+          } else {
+            m.put(cf, cq, seq, le.getValue().toArray());
+          }
+        }
+      }
+    }
+    return mutationMap.values();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java
deleted file mode 100644
index d6fefa7..0000000
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.accumulo.export;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-
-import com.google.common.base.Preconditions;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumn;
-
-/**
- * Implemented by users to export data to Accumulo by comparing the differences between a
- * RowColumn/Bytes map that is generated for old and new data and represents how the data should
- * exist in Accumulo. When comparing each row/column/value (RCV) of old and new data, mutations are
- * generated using the following rules:
- * <ul>
- * <li>If old and new data have the same RCV, nothing is done.
- * <li>If old and new data have same row/column but different values, an update mutation is created
- * for the row/column.
- * <li>If old data has a row/column that is not in the new data, a delete mutation is generated.
- * <li>If new data has a row/column that is not in the old data, an insert mutation is generated.
- * <li>Only one mutation is generated per row.
- * <li>The export sequence number is used for the timestamp in the mutation.
- * </ul>
- *
- * @param <K> Export queue key type
- * @param <V> Type of export value object used to generate data
- * @since 1.0.0
- */
-public abstract class DifferenceExport<K, V> implements AccumuloExport<K> {
-
-  private Optional<V> oldVal;
-  private Optional<V> newVal;
-
-  public DifferenceExport() {}
-
-  public DifferenceExport(Optional<V> oldVal, Optional<V> newVal) {
-    Objects.requireNonNull(oldVal);
-    Objects.requireNonNull(newVal);
-    Preconditions.checkArgument(oldVal.isPresent() || newVal.isPresent(),
-        "At least one value must be set");
-    this.oldVal = oldVal;
-    this.newVal = newVal;
-  }
-
-  /**
-   * Generates RowColumn/Bytes map of how data should exist in Accumulo. This map is generated for
-   * old and new data and compared to create export mutations that will be written to Accumulo.
-   * 
-   * @param key Export queue key
-   * @param val Export value object
-   * @return RowColumn/Bytes map of how data should exist in Accumulo
-   */
-  protected abstract Map<RowColumn, Bytes> generateData(K key, Optional<V> val);
-
-  @Override
-  public Collection<Mutation> toMutations(K key, long seq) {
-    Map<RowColumn, Bytes> oldData = generateData(key, oldVal);
-    Map<RowColumn, Bytes> newData = generateData(key, newVal);
-
-    Map<Bytes, Mutation> mutationMap = new HashMap<>();
-    for (Map.Entry<RowColumn, Bytes> entry : oldData.entrySet()) {
-      RowColumn rc = entry.getKey();
-      if (!newData.containsKey(rc)) {
-        Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray()));
-        m.putDelete(rc.getColumn().getFamily().toArray(), rc.getColumn().getQualifier().toArray(),
-            seq);
-      }
-    }
-    for (Map.Entry<RowColumn, Bytes> entry : newData.entrySet()) {
-      RowColumn rc = entry.getKey();
-      Column col = rc.getColumn();
-      Bytes newVal = entry.getValue();
-      Bytes oldVal = oldData.get(rc);
-      if (oldVal == null || !oldVal.equals(newVal)) {
-        Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray()));
-        m.put(col.getFamily().toArray(), col.getQualifier().toArray(), seq, newVal.toArray());
-      }
-    }
-    return mutationMap.values();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java
deleted file mode 100644
index 9276c6d..0000000
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.accumulo.export;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.function.Predicate;
-
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.recipes.core.transaction.LogEntry;
-import org.apache.fluo.recipes.core.transaction.TxLog;
-
-/**
- * An implementation of {@link AccumuloExport} that replicates a Fluo table to Accumulo using a
- * TxLog
- *
- * @param <K> Type of export queue key
- * @since 1.0.0
- */
-public class ReplicationExport<K> implements AccumuloExport<K> {
-
-  private TxLog txLog;
-
-  public ReplicationExport() {}
-
-  public ReplicationExport(TxLog txLog) {
-    Objects.requireNonNull(txLog);
-    this.txLog = txLog;
-  }
-
-  public static Predicate<LogEntry> getFilter() {
-    return le -> le.getOp().equals(LogEntry.Operation.DELETE)
-        || le.getOp().equals(LogEntry.Operation.SET);
-  }
-
-  @Override
-  public Collection<Mutation> toMutations(K key, long seq) {
-    Map<Bytes, Mutation> mutationMap = new HashMap<>();
-    for (LogEntry le : txLog.getLogEntries()) {
-      LogEntry.Operation op = le.getOp();
-      Column col = le.getColumn();
-      byte[] cf = col.getFamily().toArray();
-      byte[] cq = col.getQualifier().toArray();
-      byte[] cv = col.getVisibility().toArray();
-      if (op.equals(LogEntry.Operation.DELETE) || op.equals(LogEntry.Operation.SET)) {
-        Mutation m = mutationMap.computeIfAbsent(le.getRow(), k -> new Mutation(k.toArray()));
-        if (op.equals(LogEntry.Operation.DELETE)) {
-          if (col.isVisibilitySet()) {
-            m.putDelete(cf, cq, new ColumnVisibility(cv), seq);
-          } else {
-            m.putDelete(cf, cq, seq);
-          }
-        } else {
-          if (col.isVisibilitySet()) {
-            m.put(cf, cq, new ColumnVisibility(cv), seq, le.getValue().toArray());
-          } else {
-            m.put(cf, cq, seq, le.getValue().toArray());
-          }
-        }
-      }
-    }
-    return mutationMap.values();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/SharedBatchWriter.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/SharedBatchWriter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/SharedBatchWriter.java
deleted file mode 100644
index 9d189e6..0000000
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/SharedBatchWriter.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.accumulo.export;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Mutation;
-
-class SharedBatchWriter {
-
-  private static class Mutations {
-
-    List<Mutation> mutations;
-    CountDownLatch cdl = new CountDownLatch(1);
-
-    Mutations(Collection<Mutation> mutations) {
-      this.mutations = new ArrayList<>(mutations);
-    }
-  }
-
-  private static class ExportTask implements Runnable {
-
-    private BatchWriter bw;
-
-    ExportTask(String instanceName, String zookeepers, String user, String password, String table)
-        throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-      ZooKeeperInstance zki =
-          new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts(
-              zookeepers));
-
-      // TODO need to close batch writer
-      Connector conn = zki.getConnector(user, new PasswordToken(password));
-      try {
-        bw = conn.createBatchWriter(table, new BatchWriterConfig());
-      } catch (TableNotFoundException tnfe) {
-        try {
-          conn.tableOperations().create(table);
-        } catch (TableExistsException e) {
-          // nothing to do
-        }
-
-        bw = conn.createBatchWriter(table, new BatchWriterConfig());
-      }
-    }
-
-    @Override
-    public void run() {
-
-      ArrayList<Mutations> exports = new ArrayList<>();
-
-      while (true) {
-        try {
-          exports.clear();
-
-          // gather export from all threads that have placed an item on the queue
-          exports.add(exportQueue.take());
-          exportQueue.drainTo(exports);
-
-          for (Mutations ml : exports) {
-            bw.addMutations(ml.mutations);
-          }
-
-          bw.flush();
-
-          // notify all threads waiting after flushing
-          for (Mutations ml : exports) {
-            ml.cdl.countDown();
-          }
-
-        } catch (InterruptedException | MutationsRejectedException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-
-  }
-
-  private static LinkedBlockingQueue<Mutations> exportQueue = null;
-
-  private SharedBatchWriter(String instanceName, String zookeepers, String user, String password,
-      String table) throws Exception {
-
-    // TODO: fix this write to static and remove findbugs max rank override in pom.xml
-    exportQueue = new LinkedBlockingQueue<>(10000);
-    Thread queueProcessingTask =
-        new Thread(new ExportTask(instanceName, zookeepers, user, password, table));
-
-    queueProcessingTask.setDaemon(true);
-    queueProcessingTask.start();
-  }
-
-  private static Map<String, SharedBatchWriter> exporters = new HashMap<>();
-
-  static synchronized SharedBatchWriter getInstance(String instanceName, String zookeepers,
-      String user, String password, String table) throws Exception {
-
-    String key =
-        instanceName + ":" + zookeepers + ":" + user + ":" + password.hashCode() + ":" + table;
-
-    SharedBatchWriter ret = exporters.get(key);
-
-    if (ret == null) {
-      ret = new SharedBatchWriter(instanceName, zookeepers, user, password, table);
-      exporters.put(key, ret);
-    }
-
-    return ret;
-  }
-
-  void write(Collection<Mutation> mutations) {
-    Mutations work = new Mutations(mutations);
-    exportQueue.add(work);
-    try {
-      work.cdl.await();
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/TableInfo.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/TableInfo.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/TableInfo.java
deleted file mode 100644
index 9da6773..0000000
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/TableInfo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.accumulo.export;
-
-/**
- * @since 1.0.0
- */
-public class TableInfo {
-  String instanceName;
-  String zookeepers;
-  String user;
-  String password;
-  String table;
-
-  public TableInfo(String instanceName, String zookeepers, String user, String password,
-      String table) {
-    this.instanceName = instanceName;
-    this.zookeepers = zookeepers;
-    this.user = user;
-    this.password = password;
-    this.table = table;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
new file mode 100644
index 0000000..e4be08a
--- /dev/null
+++ b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.accumulo.export;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AccumuloExportTest {
+
+  public static Map<RowColumn, Bytes> genData(String key, Optional<String> val) {
+    if (!val.isPresent()) {
+      return Collections.emptyMap();
+    }
+    Map<RowColumn, Bytes> rcMap = new HashMap<>();
+    String data = val.get();
+    for (int i = 0; i < data.length(); i++) {
+      char c = data.charAt(i);
+      rcMap.put(new RowColumn("r:" + key, new Column("cf:" + c)), Bytes.of("v:" + c));
+    }
+    return rcMap;
+  }
+
+  public static Collection<Mutation> genMutations(String key, long seq, Optional<String> oldVal,
+      Optional<String> newVal) {
+    return AccumuloExportQueue.generateMutations(seq, genData(key, oldVal), genData(key, newVal));
+  }
+
+  public static Mutation makePut(String key, String val, long seq) {
+    Mutation m = new Mutation("r:" + key);
+    addPut(m, key, val, seq);
+    return m;
+  }
+
+  public static void addPut(Mutation m, String key, String val, long seq) {
+    m.put("cf:" + val, "", seq, "v:" + val);
+  }
+
+  public static Mutation makeDel(String key, String val, long seq) {
+    Mutation m = new Mutation("r:" + key);
+    addDel(m, key, val, seq);
+    return m;
+  }
+
+  public static void addDel(Mutation m, String key, String val, long seq) {
+    m.putDelete("cf:" + val, "", seq);
+  }
+
+  @Test
+  public void testDifferenceExport() {
+    Collection<Mutation> mutations;
+
+    mutations = genMutations("k1", 1, Optional.empty(), Optional.of("a"));
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertTrue(mutations.contains(makePut("k1", "a", 1)));
+
+    mutations = genMutations("k2", 2, Optional.of("ab"), Optional.of("ab"));
+    Assert.assertEquals(0, mutations.size());
+
+    mutations = genMutations("k2", 2, Optional.of("b"), Optional.of("ab"));
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertTrue(mutations.contains(makePut("k2", "a", 2)));
+
+    mutations = genMutations("k3", 3, Optional.of("c"), Optional.of("d"));
+    Assert.assertEquals(1, mutations.size());
+    Mutation m = makeDel("k3", "c", 3);
+    addPut(m, "k3", "d", 3);
+    Assert.assertTrue(mutations.contains(m));
+
+    mutations = genMutations("k4", 4, Optional.of("e"), Optional.empty());
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertTrue(mutations.contains(makeDel("k4", "e", 4)));
+
+    mutations = genMutations("k5", 5, Optional.of("ef"), Optional.of("fg"));
+    Assert.assertEquals(1, mutations.size());
+    m = makeDel("k5", "e", 5);
+    addPut(m, "k5", "g", 5);
+    Assert.assertTrue(mutations.contains(m));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/DifferenceExportTest.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/DifferenceExportTest.java b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/DifferenceExportTest.java
deleted file mode 100644
index a6e1188..0000000
--- a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/DifferenceExportTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.accumulo.export;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumn;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class DifferenceExportTest {
-
-  public class DiffExport extends DifferenceExport<String, String> {
-
-    DiffExport(Optional<String> oldV, Optional<String> newV) {
-      super(oldV, newV);
-    }
-
-    @Override
-    protected Map<RowColumn, Bytes> generateData(String key, Optional<String> val) {
-      if (!val.isPresent()) {
-        return Collections.emptyMap();
-      }
-      Map<RowColumn, Bytes> rcMap = new HashMap<>();
-      String data = val.get();
-      for (int i = 0; i < data.length(); i++) {
-        char c = data.charAt(i);
-        rcMap.put(new RowColumn("r:" + key, new Column("cf:" + c)), Bytes.of("v:" + c));
-      }
-      return rcMap;
-    }
-  }
-
-  public static Mutation makePut(String key, String val, long seq) {
-    Mutation m = new Mutation("r:" + key);
-    addPut(m, key, val, seq);
-    return m;
-  }
-
-  public static void addPut(Mutation m, String key, String val, long seq) {
-    m.put("cf:" + val, "", seq, "v:" + val);
-  }
-
-  public static Mutation makeDel(String key, String val, long seq) {
-    Mutation m = new Mutation("r:" + key);
-    addDel(m, key, val, seq);
-    return m;
-  }
-
-  public static void addDel(Mutation m, String key, String val, long seq) {
-    m.putDelete("cf:" + val, "", seq);
-  }
-
-  @Test
-  public void testDifferenceExport() {
-    Collection<Mutation> mutations;
-
-    mutations = new DiffExport(Optional.empty(), Optional.of("a")).toMutations("k1", 1);
-    Assert.assertEquals(1, mutations.size());
-    Assert.assertTrue(mutations.contains(makePut("k1", "a", 1)));
-
-    mutations = new DiffExport(Optional.of("ab"), Optional.of("ab")).toMutations("k2", 2);
-    Assert.assertEquals(0, mutations.size());
-
-    mutations = new DiffExport(Optional.of("b"), Optional.of("ab")).toMutations("k2", 2);
-    Assert.assertEquals(1, mutations.size());
-    Assert.assertTrue(mutations.contains(makePut("k2", "a", 2)));
-
-    mutations = new DiffExport(Optional.of("c"), Optional.of("d")).toMutations("k3", 3);
-    Assert.assertEquals(1, mutations.size());
-    Mutation m = makeDel("k3", "c", 3);
-    addPut(m, "k3", "d", 3);
-    Assert.assertTrue(mutations.contains(m));
-
-    mutations = new DiffExport(Optional.of("e"), Optional.empty()).toMutations("k4", 4);
-    Assert.assertEquals(1, mutations.size());
-    Assert.assertTrue(mutations.contains(makeDel("k4", "e", 4)));
-
-    mutations = new DiffExport(Optional.of("ef"), Optional.of("fg")).toMutations("k5", 5);
-    Assert.assertEquals(1, mutations.size());
-    m = makeDel("k5", "e", 5);
-    addPut(m, "k5", "g", 5);
-    Assert.assertTrue(mutations.contains(m));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
index 81d277b..fcc3a74 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
@@ -262,6 +262,10 @@ public class ExportQueue<K, V> {
       return bucketsPerTablet;
     }
 
+    public String getQueueId() {
+      return queueId;
+    }
+
     void save(SimpleConfiguration appConfig) {
       appConfig.setProperty(PREFIX + queueId + ".buckets", numBuckets + "");
       appConfig.setProperty(PREFIX + queueId + ".exporter", exporterType + "");

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java
----------------------------------------------------------------------
diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java
index 38986df..38985f1 100644
--- a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java
+++ b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java
@@ -29,10 +29,8 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
-import org.apache.fluo.recipes.accumulo.export.TableInfo;
+import org.apache.fluo.recipes.accumulo.export.AccumuloExportQueue;
 import org.apache.fluo.recipes.core.export.ExportQueue;
 import org.apache.fluo.recipes.test.AccumuloExportITBase;
 import org.apache.hadoop.io.Text;
@@ -41,30 +39,28 @@ import org.junit.Test;
 
 public class AccumuloExporterIT extends AccumuloExportITBase {
 
-  private String et;
+  private String exportTable;
   public static final String QUEUE_ID = "aeqt";
 
   @Override
   public void preFluoInitHook() throws Exception {
 
-    FluoConfiguration fluoConfig = getFluoConfiguration();
-    ExportQueue.configure(fluoConfig,
-        new ExportQueue.Options(QUEUE_ID, AccumuloExporter.class.getName(), String.class.getName(),
-            TestExport.class.getName(), 5).setBucketsPerTablet(1));
-
     // create and configure export table
-    et = "export" + tableCounter.getAndIncrement();
-    getAccumuloConnector().tableOperations().create(et);
+    exportTable = "export" + tableCounter.getAndIncrement();
+    getAccumuloConnector().tableOperations().create(exportTable);
+
     MiniAccumuloCluster miniAccumulo = getMiniAccumuloCluster();
-    AccumuloExporter.setExportTableInfo(fluoConfig, QUEUE_ID,
-        new TableInfo(miniAccumulo.getInstanceName(), miniAccumulo.getZooKeepers(), ACCUMULO_USER,
-            ACCUMULO_PASSWORD, et));
+
+    AccumuloExportQueue.configure(getFluoConfiguration(), new ExportQueue.Options(QUEUE_ID,
+        SimpleExporter.class.getName(), String.class.getName(), String.class.getName(), 5)
+        .setBucketsPerTablet(1), new AccumuloExportQueue.Options(miniAccumulo.getInstanceName(),
+        miniAccumulo.getZooKeepers(), ACCUMULO_USER, ACCUMULO_PASSWORD, exportTable));
   }
 
   @Test
   public void testAccumuloExport() throws Exception {
 
-    ExportQueue<String, TestExport> teq =
+    ExportQueue<String, String> teq =
         ExportQueue.getInstance(QUEUE_ID, getFluoConfiguration().getAppConfiguration());
 
     Assert.assertEquals(6, getFluoSplits().size());
@@ -122,9 +118,9 @@ public class AccumuloExporterIT extends AccumuloExportITBase {
     }
   }
 
-  private void export(ExportQueue<String, TestExport> teq, Transaction tx,
+  private void export(ExportQueue<String, String> teq, Transaction tx,
       Map<String, String> expected, String k, String v) {
-    teq.add(tx, k, new TestExport(v));
+    teq.add(tx, k, v);
     expected.put(k, v);
   }
 
@@ -134,7 +130,7 @@ public class AccumuloExporterIT extends AccumuloExportITBase {
   }
 
   private Map<String, String> getExports() throws Exception {
-    Scanner scanner = getAccumuloConnector().createScanner(et, Authorizations.EMPTY);
+    Scanner scanner = getAccumuloConnector().createScanner(exportTable, Authorizations.EMPTY);
     Map<String, String> ret = new HashMap<>();
 
     for (Entry<Key, Value> entry : scanner) {

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java
----------------------------------------------------------------------
diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java
index 432fa4d..e5b4e1a 100644
--- a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java
+++ b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java
@@ -27,13 +27,11 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.accumulo.export.AccumuloExport;
-import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
-import org.apache.fluo.recipes.accumulo.export.ReplicationExport;
-import org.apache.fluo.recipes.accumulo.export.TableInfo;
+import org.apache.fluo.recipes.accumulo.export.AccumuloExportQueue;
+import org.apache.fluo.recipes.accumulo.export.AccumuloReplicator;
 import org.apache.fluo.recipes.core.export.ExportQueue;
+import org.apache.fluo.recipes.core.transaction.TxLog;
 import org.apache.fluo.recipes.test.AccumuloExportITBase;
 import org.apache.fluo.recipes.core.transaction.RecordingTransaction;
 import org.apache.fluo.recipes.core.types.StringEncoder;
@@ -44,32 +42,31 @@ import org.junit.Test;
 
 public class AccumuloReplicatorIT extends AccumuloExportITBase {
 
-  private String et;
-  public static final String QUEUE_ID = "aeqt";
+  private String exportTable;
+  public static final String QUEUE_ID = "repq";
   private TypeLayer tl = new TypeLayer(new StringEncoder());
 
   @Override
   public void preFluoInitHook() throws Exception {
-    ExportQueue
-        .configure(
-            getFluoConfiguration(),
-            new ExportQueue.Options(QUEUE_ID, AccumuloExporter.class.getName(), Bytes.class
-                .getName(), AccumuloExport.class.getName(), 5));
 
     // create and configure export table
-    et = "export" + tableCounter.getAndIncrement();
-    getAccumuloConnector().tableOperations().create(et);
+    exportTable = "export" + tableCounter.getAndIncrement();
+    getAccumuloConnector().tableOperations().create(exportTable);
 
     MiniAccumuloCluster miniAccumulo = getMiniAccumuloCluster();
-    AccumuloExporter.setExportTableInfo(getFluoConfiguration(), QUEUE_ID, new TableInfo(
-        miniAccumulo.getInstanceName(), miniAccumulo.getZooKeepers(), ACCUMULO_USER,
-        ACCUMULO_PASSWORD, et));
+
+    AccumuloExportQueue.configure(
+        getFluoConfiguration(),
+        new ExportQueue.Options(QUEUE_ID, AccumuloReplicator.class.getName(), String.class
+            .getName(), TxLog.class.getName(), 5),
+        new AccumuloExportQueue.Options(miniAccumulo.getInstanceName(), miniAccumulo
+            .getZooKeepers(), ACCUMULO_USER, ACCUMULO_PASSWORD, exportTable));
   }
 
   @Test
   public void testAccumuloReplicator() throws Exception {
 
-    ExportQueue<Bytes, AccumuloExport<?>> eq =
+    ExportQueue<String, TxLog> eq =
         ExportQueue.getInstance(QUEUE_ID, getFluoConfiguration().getAppConfiguration());
 
     MiniFluo miniFluo = getMiniFluo();
@@ -78,12 +75,12 @@ public class AccumuloReplicatorIT extends AccumuloExportITBase {
       Map<String, String> expected = new HashMap<>();
 
       try (Transaction tx = fc.newTransaction()) {
-        RecordingTransaction rtx = RecordingTransaction.wrap(tx, ReplicationExport.getFilter());
+        RecordingTransaction rtx = RecordingTransaction.wrap(tx, AccumuloReplicator.getFilter());
         TypedTransaction ttx = tl.wrap(rtx);
         write(ttx, expected, "k1", "v1");
         write(ttx, expected, "k2", "v2");
         write(ttx, expected, "k3", "v3");
-        eq.add(tx, Bytes.of("q1"), new ReplicationExport<>(rtx.getTxLog()));
+        eq.add(tx, "q1", rtx.getTxLog());
         tx.commit();
       }
 
@@ -91,13 +88,13 @@ public class AccumuloReplicatorIT extends AccumuloExportITBase {
       Assert.assertEquals(expected, getExports());
 
       try (Transaction tx = fc.newTransaction()) {
-        RecordingTransaction rtx = RecordingTransaction.wrap(tx, ReplicationExport.getFilter());
+        RecordingTransaction rtx = RecordingTransaction.wrap(tx, AccumuloReplicator.getFilter());
         TypedTransaction ttx = tl.wrap(rtx);
         write(ttx, expected, "k1", "v4");
         delete(ttx, expected, "k3");
         write(ttx, expected, "k2", "v5");
         write(ttx, expected, "k4", "v6");
-        eq.add(tx, Bytes.of("q1"), new ReplicationExport<>(rtx.getTxLog()));
+        eq.add(tx, "q1", rtx.getTxLog());
         tx.commit();
       }
 
@@ -105,13 +102,13 @@ public class AccumuloReplicatorIT extends AccumuloExportITBase {
       Assert.assertEquals(expected, getExports());
 
       try (Transaction tx = fc.newTransaction()) {
-        RecordingTransaction rtx = RecordingTransaction.wrap(tx, ReplicationExport.getFilter());
+        RecordingTransaction rtx = RecordingTransaction.wrap(tx, AccumuloReplicator.getFilter());
         TypedTransaction ttx = tl.wrap(rtx);
         write(ttx, expected, "k2", "v7");
         write(ttx, expected, "k3", "v8");
         delete(ttx, expected, "k1");
         delete(ttx, expected, "k4");
-        eq.add(tx, Bytes.of("q1"), new ReplicationExport<>(rtx.getTxLog()));
+        eq.add(tx, "q1", rtx.getTxLog());
         tx.commit();
       }
 
@@ -131,7 +128,7 @@ public class AccumuloReplicatorIT extends AccumuloExportITBase {
   }
 
   private Map<String, String> getExports() throws Exception {
-    Scanner scanner = getAccumuloConnector().createScanner(et, Authorizations.EMPTY);
+    Scanner scanner = getAccumuloConnector().createScanner(exportTable, Authorizations.EMPTY);
     Map<String, String> ret = new HashMap<>();
 
     for (Entry<Key, Value> entry : scanner) {

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
----------------------------------------------------------------------
diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
new file mode 100644
index 0000000..5aa5812
--- /dev/null
+++ b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.test.export;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
+import org.apache.fluo.recipes.core.export.SequencedExport;
+
+public class SimpleExporter extends AccumuloExporter<String, String> {
+
+  @Override
+  protected Collection<Mutation> processExport(SequencedExport<String, String> export) {
+    Mutation m = new Mutation(export.getKey());
+    m.put("cf", "cq", export.getSequence(), export.getValue());
+    return Collections.singleton(m);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/test/src/test/java/org/apache/fluo/recipes/test/export/TestExport.java
----------------------------------------------------------------------
diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/TestExport.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/TestExport.java
deleted file mode 100644
index 9132e0b..0000000
--- a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/TestExport.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.test.export;
-
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.fluo.recipes.accumulo.export.AccumuloExport;
-
-public class TestExport implements AccumuloExport<String> {
-
-  private String value;
-
-  public TestExport() {}
-
-  public TestExport(String value) {
-    this.value = value;
-  }
-
-  @Override
-  public Collection<Mutation> toMutations(String key, long seq) {
-    Mutation m = new Mutation(key);
-    m.put("cf", "cq", seq, value);
-    return Collections.singletonList(m);
-  }
-}