You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/07/15 22:07:58 UTC
[10/10] incubator-fluo-recipes git commit: Merge remote-tracking
branch 'mike/package-refactor'
Merge remote-tracking branch 'mike/package-refactor'
Conflicts:
docs/row-hasher.md
modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
modules/core/src/main/java/org/apache/fluo/recipes/common/Pirtos.java
modules/core/src/main/java/org/apache/fluo/recipes/common/TableOptimizations.java
modules/core/src/main/java/org/apache/fluo/recipes/core/common/Pirtos.java
modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java
modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/22354d0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/22354d0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/22354d0f
Branch: refs/heads/master
Commit: 22354d0f7f03f532ec093b7ece61b5ae9fb070e9
Parents: a8b85f3 083e4af
Author: Keith Turner <kt...@apache.org>
Authored: Fri Jul 15 18:01:55 2016 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Fri Jul 15 18:01:55 2016 -0400
----------------------------------------------------------------------
docs/export-queue.md | 2 +-
docs/row-hasher.md | 4 +-
docs/serialization.md | 2 +-
docs/transient.md | 2 +-
.../recipes/accumulo/cmds/CompactTransient.java | 13 +-
.../recipes/accumulo/cmds/OptimizeTable.java | 6 +-
.../recipes/accumulo/export/AccumuloExport.java | 1 +
.../accumulo/export/AccumuloExporter.java | 5 +-
.../accumulo/export/DifferenceExport.java | 1 +
.../accumulo/export/ReplicationExport.java | 5 +-
.../accumulo/export/SharedBatchWriter.java | 14 +-
.../fluo/recipes/accumulo/export/TableInfo.java | 3 +
.../recipes/accumulo/ops/TableOperations.java | 9 +-
.../apache/fluo/recipes/common/RowRange.java | 82 ---
.../fluo/recipes/common/TableOptimizations.java | 82 ---
.../fluo/recipes/common/TransientRegistry.java | 79 ---
.../fluo/recipes/core/common/RowRange.java | 85 +++
.../recipes/core/common/TableOptimizations.java | 84 +++
.../recipes/core/common/TransientRegistry.java | 80 +++
.../fluo/recipes/core/data/RowHasher.java | 137 ++++
.../apache/fluo/recipes/core/export/Export.java | 41 ++
.../fluo/recipes/core/export/ExportBucket.java | 203 ++++++
.../fluo/recipes/core/export/ExportEntry.java | 22 +
.../recipes/core/export/ExportObserver.java | 143 ++++
.../fluo/recipes/core/export/ExportQueue.java | 277 ++++++++
.../fluo/recipes/core/export/Exporter.java | 67 ++
.../recipes/core/export/SequencedExport.java | 32 +
.../fluo/recipes/core/impl/BucketUtil.java | 24 +
.../fluo/recipes/core/map/CollisionFreeMap.java | 657 +++++++++++++++++++
.../core/map/CollisionFreeMapObserver.java | 54 ++
.../apache/fluo/recipes/core/map/Combiner.java | 34 +
.../recipes/core/map/NullUpdateObserver.java | 25 +
.../apache/fluo/recipes/core/map/Update.java | 46 ++
.../fluo/recipes/core/map/UpdateObserver.java | 35 +
.../core/serialization/SimpleSerializer.java | 59 ++
.../fluo/recipes/core/transaction/LogEntry.java | 116 ++++
.../core/transaction/RecordingTransaction.java | 66 ++
.../transaction/RecordingTransactionBase.java | 252 +++++++
.../fluo/recipes/core/transaction/TxLog.java | 81 +++
.../apache/fluo/recipes/core/types/Encoder.java | 86 +++
.../fluo/recipes/core/types/StringEncoder.java | 86 +++
.../fluo/recipes/core/types/TypeLayer.java | 488 ++++++++++++++
.../fluo/recipes/core/types/TypedLoader.java | 45 ++
.../fluo/recipes/core/types/TypedObserver.java | 46 ++
.../fluo/recipes/core/types/TypedSnapshot.java | 38 ++
.../recipes/core/types/TypedSnapshotBase.java | 555 ++++++++++++++++
.../recipes/core/types/TypedTransaction.java | 46 ++
.../core/types/TypedTransactionBase.java | 278 ++++++++
.../org/apache/fluo/recipes/data/RowHasher.java | 135 ----
.../org/apache/fluo/recipes/export/Export.java | 38 --
.../fluo/recipes/export/ExportBucket.java | 203 ------
.../apache/fluo/recipes/export/ExportEntry.java | 22 -
.../fluo/recipes/export/ExportObserver.java | 140 ----
.../apache/fluo/recipes/export/ExportQueue.java | 274 --------
.../apache/fluo/recipes/export/Exporter.java | 64 --
.../fluo/recipes/export/SequencedExport.java | 29 -
.../apache/fluo/recipes/impl/BucketUtil.java | 24 -
.../fluo/recipes/map/CollisionFreeMap.java | 657 -------------------
.../recipes/map/CollisionFreeMapObserver.java | 53 --
.../org/apache/fluo/recipes/map/Combiner.java | 31 -
.../fluo/recipes/map/NullUpdateObserver.java | 25 -
.../org/apache/fluo/recipes/map/Update.java | 43 --
.../apache/fluo/recipes/map/UpdateObserver.java | 34 -
.../recipes/serialization/SimpleSerializer.java | 56 --
.../fluo/recipes/transaction/LogEntry.java | 114 ----
.../transaction/RecordingTransaction.java | 64 --
.../transaction/RecordingTransactionBase.java | 250 -------
.../apache/fluo/recipes/transaction/TxLog.java | 79 ---
.../org/apache/fluo/recipes/types/Encoder.java | 86 ---
.../fluo/recipes/types/StringEncoder.java | 86 ---
.../apache/fluo/recipes/types/TypeLayer.java | 488 --------------
.../apache/fluo/recipes/types/TypedLoader.java | 45 --
.../fluo/recipes/types/TypedObserver.java | 46 --
.../fluo/recipes/types/TypedSnapshot.java | 38 --
.../fluo/recipes/types/TypedSnapshotBase.java | 555 ----------------
.../fluo/recipes/types/TypedTransaction.java | 46 --
.../recipes/types/TypedTransactionBase.java | 278 --------
.../fluo/recipes/common/TestGrouping.java | 95 ---
.../recipes/common/TransientRegistryTest.java | 48 --
.../fluo/recipes/core/common/TestGrouping.java | 93 +++
.../core/common/TransientRegistryTest.java | 48 ++
.../fluo/recipes/core/data/RowHasherTest.java | 62 ++
.../recipes/core/export/DocumentLoader.java | 36 +
.../recipes/core/export/DocumentObserver.java | 63 ++
.../recipes/core/export/ExportBufferIT.java | 106 +++
.../fluo/recipes/core/export/ExportQueueIT.java | 114 ++++
.../recipes/core/export/ExportTestBase.java | 286 ++++++++
.../recipes/core/export/GsonSerializer.java | 42 ++
.../fluo/recipes/core/export/OptionsTest.java | 51 ++
.../fluo/recipes/core/export/RefInfo.java | 26 +
.../fluo/recipes/core/export/RefUpdates.java | 43 ++
.../fluo/recipes/core/map/BigUpdateIT.java | 214 ++++++
.../recipes/core/map/CollisionFreeMapIT.java | 361 ++++++++++
.../fluo/recipes/core/map/DocumentLoader.java | 35 +
.../fluo/recipes/core/map/DocumentObserver.java | 89 +++
.../fluo/recipes/core/map/OptionsTest.java | 51 ++
.../fluo/recipes/core/map/SplitsTest.java | 76 +++
.../fluo/recipes/core/map/TestSerializer.java | 45 ++
.../recipes/core/map/WordCountCombiner.java | 36 +
.../recipes/core/map/WordCountObserver.java | 47 ++
.../transaction/RecordingTransactionTest.java | 227 +++++++
.../fluo/recipes/core/types/MockSnapshot.java | 30 +
.../recipes/core/types/MockSnapshotBase.java | 202 ++++++
.../recipes/core/types/MockTransaction.java | 36 +
.../recipes/core/types/MockTransactionBase.java | 90 +++
.../fluo/recipes/core/types/TypeLayerTest.java | 494 ++++++++++++++
.../apache/fluo/recipes/data/RowHasherTest.java | 62 --
.../fluo/recipes/export/DocumentLoader.java | 36 -
.../fluo/recipes/export/DocumentObserver.java | 63 --
.../fluo/recipes/export/ExportBufferIT.java | 106 ---
.../fluo/recipes/export/ExportQueueIT.java | 114 ----
.../fluo/recipes/export/ExportTestBase.java | 286 --------
.../fluo/recipes/export/GsonSerializer.java | 42 --
.../apache/fluo/recipes/export/OptionsTest.java | 51 --
.../org/apache/fluo/recipes/export/RefInfo.java | 26 -
.../apache/fluo/recipes/export/RefUpdates.java | 43 --
.../apache/fluo/recipes/map/BigUpdateIT.java | 214 ------
.../fluo/recipes/map/CollisionFreeMapIT.java | 361 ----------
.../apache/fluo/recipes/map/DocumentLoader.java | 35 -
.../fluo/recipes/map/DocumentObserver.java | 89 ---
.../apache/fluo/recipes/map/OptionsTest.java | 51 --
.../org/apache/fluo/recipes/map/SplitsTest.java | 76 ---
.../apache/fluo/recipes/map/TestSerializer.java | 45 --
.../fluo/recipes/map/WordCountCombiner.java | 36 -
.../fluo/recipes/map/WordCountObserver.java | 47 --
.../transaction/RecordingTransactionTest.java | 227 -------
.../apache/fluo/recipes/types/MockSnapshot.java | 30 -
.../fluo/recipes/types/MockSnapshotBase.java | 202 ------
.../fluo/recipes/types/MockTransaction.java | 36 -
.../fluo/recipes/types/MockTransactionBase.java | 90 ---
.../fluo/recipes/types/TypeLayerTest.java | 494 --------------
.../recipes/kryo/KryoSimplerSerializer.java | 5 +-
.../serialization/KryoSimpleSerializerTest.java | 45 ++
.../serialization/KryoSimpleSerializerTest.java | 45 --
.../recipes/spark/AccumuloRangePartitioner.java | 3 +
.../fluo/recipes/spark/FluoSparkHelper.java | 2 +
.../fluo/recipes/spark/FluoSparkTestUtil.java | 2 +
.../fluo/recipes/test/AccumuloExportITBase.java | 4 +-
.../apache/fluo/recipes/test/FluoITHelper.java | 2 +
.../recipes/test/export/AccumuloExporterIT.java | 2 +-
.../test/export/AccumuloReplicatorIT.java | 10 +-
141 files changed, 7395 insertions(+), 7334 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/docs/row-hasher.md
----------------------------------------------------------------------
diff --cc docs/row-hasher.md
index 78aab23,8db8af6..fdd8218
--- a/docs/row-hasher.md
+++ b/docs/row-hasher.md
@@@ -31,8 -31,8 +31,8 @@@ balancing of the prefix
```java
import org.apache.fluo.api.data.Bytes;
- import org.apache.fluo.recipes.common.TableOptimizations;
- import org.apache.fluo.recipes.data.RowHasher;
-import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
+ import org.apache.fluo.recipes.core.data.RowHasher;
public class RowHasherExample {
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
----------------------------------------------------------------------
diff --cc modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
index 7910bdb,2a56cfb..92651bd
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/OptimizeTable.java
@@@ -19,8 -19,11 +19,11 @@@ import javax.inject.Inject
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.recipes.accumulo.ops.TableOperations;
- import org.apache.fluo.recipes.common.TableOptimizations;
-import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
+ /**
+ * @since 1.0.0
+ */
public class OptimizeTable {
// when run with fluo exec command, the applications fluo config will be injected
@@@ -33,9 -36,7 +36,8 @@@
System.exit(-1);
}
-
- TableOperations.optimizeTable(fluoConfig, Pirtos.getConfiguredOptimizations(fluoConfig));
+ TableOperations.optimizeTable(fluoConfig,
+ TableOptimizations.getConfiguredOptimizations(fluoConfig));
System.out.println("Finished optimizing table");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
----------------------------------------------------------------------
diff --cc modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
index c4fd07f,3cc418c..e433317
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
@@@ -28,9 -28,9 +28,9 @@@ import org.apache.fluo.api.client.FluoF
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
- import org.apache.fluo.recipes.common.TableOptimizations;
- import org.apache.fluo.recipes.common.RowRange;
- import org.apache.fluo.recipes.common.TransientRegistry;
-import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
+ import org.apache.fluo.recipes.core.common.RowRange;
+ import org.apache.fluo.recipes.core.common.TransientRegistry;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java
index 0000000,0000000..4657366
new file mode 100644
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TableOptimizations.java
@@@ -1,0 -1,0 +1,84 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
++ * agreements. See the NOTICE file distributed with this work for additional information regarding
++ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance with the License. You may obtain a
++ * copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software distributed under the License
++ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
++ * or implied. See the License for the specific language governing permissions and limitations under
++ * the License.
++ */
++
++package org.apache.fluo.recipes.core.common;
++
++import java.util.ArrayList;
++import java.util.Collections;
++import java.util.List;
++import java.util.Objects;
++
++import org.apache.fluo.api.client.FluoClient;
++import org.apache.fluo.api.client.FluoFactory;
++import org.apache.fluo.api.config.FluoConfiguration;
++import org.apache.fluo.api.config.SimpleConfiguration;
++import org.apache.fluo.api.data.Bytes;
++import org.apache.fluo.recipes.core.export.ExportQueue;
++import org.apache.fluo.recipes.core.map.CollisionFreeMap;
++
++/**
++ * Post initialization recommended table optimizations.
++ *
++ * @since 1.0.0
++ */
++public class TableOptimizations {
++ private List<Bytes> splits = new ArrayList<>();
++ private String tabletGroupingRegex = "";
++
++ public void setSplits(List<Bytes> splits) {
++ this.splits.clear();
++ this.splits.addAll(splits);
++ }
++
++ /**
++ * @return A recommended set of splits points to add to a Fluo table after initialization.
++ */
++ public List<Bytes> getSplits() {
++ return Collections.unmodifiableList(splits);
++ }
++
++ public void setTabletGroupingRegex(String tgr) {
++ Objects.requireNonNull(tgr);
++ this.tabletGroupingRegex = tgr;
++ }
++
++ public String getTabletGroupingRegex() {
++ return "(" + tabletGroupingRegex + ").*";
++ }
++
++ public void merge(TableOptimizations other) {
++ splits.addAll(other.splits);
++ if (tabletGroupingRegex.length() > 0 && other.tabletGroupingRegex.length() > 0) {
++ tabletGroupingRegex += "|" + other.tabletGroupingRegex;
++ } else {
++ tabletGroupingRegex += other.tabletGroupingRegex;
++ }
++ }
++
++ /**
++ * A utility method to get table optimizations for all configured recipes.
++ */
++ public static TableOptimizations getConfiguredOptimizations(FluoConfiguration fluoConfig) {
++ try (FluoClient client = FluoFactory.newClient(fluoConfig)) {
++ SimpleConfiguration appConfig = client.getAppConfiguration();
++ TableOptimizations tableOptim = new TableOptimizations();
++
++ tableOptim.merge(ExportQueue.getTableOptimizations(appConfig));
++ tableOptim.merge(CollisionFreeMap.getTableOptimizations(appConfig));
++
++ return tableOptim;
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java
index 0000000,ace7e7e..e40ce9b
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/data/RowHasher.java
@@@ -1,0 -1,137 +1,137 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+ package org.apache.fluo.recipes.core.data;
+
+ import java.util.ArrayList;
+ import java.util.List;
+ import java.util.regex.Pattern;
+
+ import com.google.common.base.Preconditions;
+ import com.google.common.base.Strings;
+ import com.google.common.hash.Hashing;
+ import org.apache.fluo.api.data.Bytes;
+ import org.apache.fluo.api.data.BytesBuilder;
-import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
+
+ /**
+ * This recipe provides code to help add a hash of the row as a prefix of the row. Using this recipe
+ * rows are structured like the following.
+ *
+ * <p>
+ * {@code <prefix>:<fixed len row hash>:<user row>}
+ *
+ * <p>
+ * The recipe also provides code the help generate split points and configure balancing of the
+ * prefix.
+ *
+ * <p>
+ * The project documentation has more information.
+ *
+ * @since 1.0.0
+ */
+ public class RowHasher {
+
+ private static final int HASH_LEN = 4;
+
- public Pirtos getTableOptimizations(int numTablets) {
++ public TableOptimizations getTableOptimizations(int numTablets) {
+
+ List<Bytes> splits = new ArrayList<>(numTablets - 1);
+
+ int numSplits = numTablets - 1;
+ int distance = (((int) Math.pow(Character.MAX_RADIX, HASH_LEN) - 1) / numTablets) + 1;
+ int split = distance;
+ for (int i = 0; i < numSplits; i++) {
+ splits.add(Bytes.of(prefix
+ + Strings.padStart(Integer.toString(split, Character.MAX_RADIX), HASH_LEN, '0')));
+ split += distance;
+ }
+
+ splits.add(Bytes.of(prefix + "~"));
+
+
- Pirtos pirtos = new Pirtos();
- pirtos.setSplits(splits);
- pirtos.setTabletGroupingRegex(Pattern.quote(prefix.toString()));
++ TableOptimizations tableOptim = new TableOptimizations();
++ tableOptim.setSplits(splits);
++ tableOptim.setTabletGroupingRegex(Pattern.quote(prefix.toString()));
+
- return pirtos;
++ return tableOptim;
+ }
+
+
+ private Bytes prefix;
+
+ public RowHasher(String prefix) {
+ this.prefix = Bytes.of(prefix + ":");
+ }
+
+ /**
+ * @return Returns input with prefix and hash of input prepended.
+ */
+ public Bytes addHash(String row) {
+ return addHash(Bytes.of(row));
+ }
+
+ /**
+ * @return Returns input with prefix and hash of input prepended.
+ */
+ public Bytes addHash(Bytes row) {
+ BytesBuilder builder = Bytes.newBuilder(prefix.length() + 5 + row.length());
+ builder.append(prefix);
+ builder.append(genHash(row));
+ builder.append(":");
+ builder.append(row);
+ return builder.toBytes();
+ }
+
+ private boolean hasHash(Bytes row) {
+ for (int i = prefix.length(); i < prefix.length() + HASH_LEN; i++) {
+ byte b = row.byteAt(i);
+ boolean isAlphaNum = (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9');
+ if (!isAlphaNum) {
+ return false;
+ }
+ }
+
+ if (row.byteAt(prefix.length() - 1) != ':' || row.byteAt(prefix.length() + HASH_LEN) != ':') {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * @return Returns input with prefix and hash stripped from beginning.
+ */
+ public Bytes removeHash(Bytes row) {
+ Preconditions.checkArgument(row.length() >= prefix.length() + 5,
+ "Row is shorter than expected " + row);
+ Preconditions.checkArgument(row.subSequence(0, prefix.length()).equals(prefix),
+ "Row does not have expected prefix " + row);
+ Preconditions.checkArgument(hasHash(row), "Row does not have expected hash " + row);
+ return row.subSequence(prefix.length() + 5, row.length());
+ }
+
+ private static String genHash(Bytes row) {
+ int hash = Hashing.murmur3_32().hashBytes(row.toArray()).asInt();
+ hash = hash & 0x7fffffff;
+ // base 36 gives a lot more bins in 4 bytes than hex, but it is still human readable which is
+ // nice for debugging.
+ String hashString =
+ Strings.padStart(Integer.toString(hash, Character.MAX_RADIX), HASH_LEN, '0');
+ hashString = hashString.substring(hashString.length() - HASH_LEN);
+
+ return hashString;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
index 0000000,ac04f80..dffa713
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
@@@ -1,0 -1,276 +1,277 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+ package org.apache.fluo.recipes.core.export;
+
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.regex.Pattern;
+
+ import com.google.common.base.Preconditions;
+ import com.google.common.hash.Hashing;
+ import org.apache.fluo.api.client.TransactionBase;
+ import org.apache.fluo.api.config.FluoConfiguration;
+ import org.apache.fluo.api.config.ObserverConfiguration;
+ import org.apache.fluo.api.config.SimpleConfiguration;
+ import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
+ import org.apache.fluo.recipes.core.common.RowRange;
+ import org.apache.fluo.recipes.core.common.TransientRegistry;
+ import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+
+ /**
+ * @since 1.0.0
+ */
+ public class ExportQueue<K, V> {
+
+ private static final String RANGE_BEGIN = "#";
+ private static final String RANGE_END = ":~";
+
+ private int numBuckets;
+ private SimpleSerializer serializer;
+ private String queueId;
+
+ // usage hint : could be created once in an observers init method
+ // usage hint : maybe have a queue for each type of data being exported???
+ // maybe less queues are
+ // more efficient though because more batching at export time??
+ ExportQueue(Options opts, SimpleSerializer serializer) throws Exception {
+ // TODO sanity check key type based on type params
+ // TODO defer creating classes until needed.. so that its not done during Fluo init
+ this.queueId = opts.queueId;
+ this.numBuckets = opts.numBuckets;
+ this.serializer = serializer;
+ }
+
+ public void add(TransactionBase tx, K key, V value) {
+ addAll(tx, Collections.singleton(new Export<>(key, value)).iterator());
+ }
+
+ public void addAll(TransactionBase tx, Iterator<Export<K, V>> exports) {
+
+ Set<Integer> bucketsNotified = new HashSet<>();
+ while (exports.hasNext()) {
+ Export<K, V> export = exports.next();
+
+ byte[] k = serializer.serialize(export.getKey());
+ byte[] v = serializer.serialize(export.getValue());
+
+ int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+ int bucketId = Math.abs(hash % numBuckets);
+
+ ExportBucket bucket = new ExportBucket(tx, queueId, bucketId, numBuckets);
+ bucket.add(tx.getStartTimestamp(), k, v);
+
+ if (!bucketsNotified.contains(bucketId)) {
+ bucket.notifyExportObserver();
+ bucketsNotified.add(bucketId);
+ }
+ }
+ }
+
+ public static <K2, V2> ExportQueue<K2, V2> getInstance(String exportQueueId,
+ SimpleConfiguration appConfig) {
+ Options opts = new Options(exportQueueId, appConfig);
+ try {
+ return new ExportQueue<>(opts, SimpleSerializer.getInstance(appConfig));
+ } catch (Exception e) {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Call this method before initializing Fluo.
+ *
+ * @param fluoConfig The configuration that will be used to initialize fluo.
+ */
+ public static void configure(FluoConfiguration fluoConfig, Options opts) {
+ SimpleConfiguration appConfig = fluoConfig.getAppConfiguration();
+ opts.save(appConfig);
+
+ fluoConfig.addObserver(new ObserverConfiguration(ExportObserver.class.getName())
+ .setParameters(Collections.singletonMap("queueId", opts.queueId)));
+
+ Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN);
+ Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END);
+
+ new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("exportQueue."
+ + opts.queueId, new RowRange(exportRangeStart, exportRangeStop));
+ }
+
+ /**
+ * Return suggested Fluo table optimizations for all previously configured export queues.
+ *
+ * @param appConfig Must pass in the application configuration obtained from
+ * {@code FluoClient.getAppConfiguration()} or
+ * {@code FluoConfiguration.getAppConfiguration()}
+ */
+
- public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) {
++ public static TableOptimizations getTableOptimizations(SimpleConfiguration appConfig) {
+ HashSet<String> queueIds = new HashSet<>();
+ appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining(
+ k -> queueIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0]));
+
- Pirtos pirtos = new Pirtos();
- queueIds.forEach(qid -> pirtos.merge(getTableOptimizations(qid, appConfig)));
++ TableOptimizations tableOptim = new TableOptimizations();
++ queueIds.forEach(qid -> tableOptim.merge(getTableOptimizations(qid, appConfig)));
+
- return pirtos;
++ return tableOptim;
+ }
+
+ /**
+ * Return suggested Fluo table optimizations for the specified export queue.
+ *
+ * @param appConfig Must pass in the application configuration obtained from
+ * {@code FluoClient.getAppConfiguration()} or
+ * {@code FluoConfiguration.getAppConfiguration()}
+ */
- public static Pirtos getTableOptimizations(String queueId, SimpleConfiguration appConfig) {
++ public static TableOptimizations getTableOptimizations(String queueId,
++ SimpleConfiguration appConfig) {
+ Options opts = new Options(queueId, appConfig);
+
+ List<Bytes> splits = new ArrayList<>();
+
+ Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN);
+ Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END);
+
+ splits.add(exportRangeStart);
+ splits.add(exportRangeStop);
+
+ List<Bytes> exportSplits = new ArrayList<>();
+ for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
+ exportSplits.add(ExportBucket.generateBucketRow(opts.queueId, i, opts.numBuckets));
+ }
+ Collections.sort(exportSplits);
+ splits.addAll(exportSplits);
+
- Pirtos pirtos = new Pirtos();
- pirtos.setSplits(splits);
++ TableOptimizations tableOptim = new TableOptimizations();
++ tableOptim.setSplits(splits);
+
+ // the tablet with end row <queueId># does not contain any data for the export queue and
+ // should not be grouped with the export queue
- pirtos.setTabletGroupingRegex(Pattern.quote(queueId + ":"));
++ tableOptim.setTabletGroupingRegex(Pattern.quote(queueId + ":"));
+
- return pirtos;
++ return tableOptim;
+ }
+
+ public static class Options {
+
+ private static final String PREFIX = "recipes.exportQueue.";
+ static final long DEFAULT_BUFFER_SIZE = 1 << 20;
+ static final int DEFAULT_BUCKETS_PER_TABLET = 10;
+
+ int numBuckets;
+ Integer bucketsPerTablet = null;
+ Long bufferSize;
+
+ String keyType;
+ String valueType;
+ String exporterType;
+ String queueId;
+
+ Options(String queueId, SimpleConfiguration appConfig) {
+ this.queueId = queueId;
+
+ this.numBuckets = appConfig.getInt(PREFIX + queueId + ".buckets");
+ this.exporterType = appConfig.getString(PREFIX + queueId + ".exporter");
+ this.keyType = appConfig.getString(PREFIX + queueId + ".key");
+ this.valueType = appConfig.getString(PREFIX + queueId + ".val");
+ this.bufferSize = appConfig.getLong(PREFIX + queueId + ".bufferSize", DEFAULT_BUFFER_SIZE);
+ this.bucketsPerTablet =
+ appConfig.getInt(PREFIX + queueId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET);
+ }
+
+ public Options(String queueId, String exporterType, String keyType, String valueType,
+ int buckets) {
+ Preconditions.checkArgument(buckets > 0);
+
+ this.queueId = queueId;
+ this.numBuckets = buckets;
+ this.exporterType = exporterType;
+ this.keyType = keyType;
+ this.valueType = valueType;
+ }
+
+
+ public <K, V> Options(String queueId, Class<? extends Exporter<K, V>> exporter,
+ Class<K> keyType, Class<V> valueType, int buckets) {
+ this(queueId, exporter.getName(), keyType.getName(), valueType.getName(), buckets);
+ }
+
+ /**
+ * Sets a limit on the amount of serialized updates to read into memory. Additional memory will
+ * be used to actually deserialize and process the updates. This limit does not account for
+ * object overhead in java, which can be significant.
+ *
+ * <p>
+ * The way memory read is calculated is by summing the length of serialized key and value byte
+ * arrays. Once this sum exceeds the configured memory limit, no more export key values are
+ * processed in the current transaction. When not everything is processed, the observer
+ * processing exports will notify itself causing another transaction to continue processing
+ * later.
+ */
+ public Options setBufferSize(long bufferSize) {
+ Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive");
+ this.bufferSize = bufferSize;
+ return this;
+ }
+
+ long getBufferSize() {
+ if (bufferSize == null) {
+ return DEFAULT_BUFFER_SIZE;
+ }
+
+ return bufferSize;
+ }
+
+ /**
+ * Sets the number of buckets per tablet to generate. This affects how many split points will be
+ * generated when optimizing the Accumulo table.
+ *
+ */
+ public Options setBucketsPerTablet(int bucketsPerTablet) {
+ Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : "
+ + bucketsPerTablet);
+ this.bucketsPerTablet = bucketsPerTablet;
+ return this;
+ }
+
+ int getBucketsPerTablet() {
+ if (bucketsPerTablet == null) {
+ return DEFAULT_BUCKETS_PER_TABLET;
+ }
+
+ return bucketsPerTablet;
+ }
+
+ void save(SimpleConfiguration appConfig) {
+ appConfig.setProperty(PREFIX + queueId + ".buckets", numBuckets + "");
+ appConfig.setProperty(PREFIX + queueId + ".exporter", exporterType + "");
+ appConfig.setProperty(PREFIX + queueId + ".key", keyType);
+ appConfig.setProperty(PREFIX + queueId + ".val", valueType);
+
+ if (bufferSize != null) {
+ appConfig.setProperty(PREFIX + queueId + ".bufferSize", bufferSize);
+ }
+ if (bucketsPerTablet != null) {
+ appConfig.setProperty(PREFIX + queueId + ".bucketsPerTablet", bucketsPerTablet);
+ }
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
index 0000000,6cbc658..2fe4a7c
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
@@@ -1,0 -1,657 +1,657 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+ package org.apache.fluo.recipes.core.map;
+
+ import java.io.Serializable;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Map.Entry;
+ import java.util.Optional;
+ import java.util.Set;
+ import java.util.regex.Pattern;
+
+ import com.google.common.base.Preconditions;
+ import com.google.common.collect.ImmutableMap;
+ import com.google.common.collect.Iterators;
+ import com.google.common.collect.Sets;
+ import com.google.common.hash.Hashing;
+ import org.apache.fluo.api.client.SnapshotBase;
+ import org.apache.fluo.api.client.TransactionBase;
+ import org.apache.fluo.api.config.FluoConfiguration;
+ import org.apache.fluo.api.config.ObserverConfiguration;
+ import org.apache.fluo.api.config.ScannerConfiguration;
+ import org.apache.fluo.api.config.SimpleConfiguration;
+ import org.apache.fluo.api.data.Bytes;
+ import org.apache.fluo.api.data.BytesBuilder;
+ import org.apache.fluo.api.data.Column;
+ import org.apache.fluo.api.data.RowColumn;
+ import org.apache.fluo.api.data.RowColumnValue;
+ import org.apache.fluo.api.data.Span;
+ import org.apache.fluo.api.iterator.ColumnIterator;
+ import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
+ import org.apache.fluo.recipes.core.common.RowRange;
+ import org.apache.fluo.recipes.core.common.TransientRegistry;
+ import org.apache.fluo.recipes.core.impl.BucketUtil;
+ import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+
+ /**
+ * See the project level documentation for information about this recipe.
+ *
+ * @since 1.0.0
+ */
+ public class CollisionFreeMap<K, V> {
+
+ private static final String UPDATE_RANGE_END = ":u:~";
+
+ private static final String DATA_RANGE_END = ":d:~";
+
+ private String mapId;
+
+ private Class<K> keyType;
+ private Class<V> valType;
+ private SimpleSerializer serializer;
+ private Combiner<K, V> combiner;
+ UpdateObserver<K, V> updateObserver;
+ private long bufferSize;
+
+ static final Column UPDATE_COL = new Column("u", "v");
+ static final Column NEXT_COL = new Column("u", "next");
+
+ private int numBuckets = -1;
+
+ @SuppressWarnings("unchecked")
+ CollisionFreeMap(Options opts, SimpleSerializer serializer) throws Exception {
+
+ this.mapId = opts.mapId;
+ // TODO defer loading classes
+ // TODO centralize class loading
+ // TODO try to check type params
+ this.numBuckets = opts.numBuckets;
+ this.keyType = (Class<K>) getClass().getClassLoader().loadClass(opts.keyType);
+ this.valType = (Class<V>) getClass().getClassLoader().loadClass(opts.valueType);
+ this.combiner =
+ (Combiner<K, V>) getClass().getClassLoader().loadClass(opts.combinerType).newInstance();
+ this.serializer = serializer;
+ if (opts.updateObserverType != null) {
+ this.updateObserver =
+ getClass().getClassLoader().loadClass(opts.updateObserverType)
+ .asSubclass(UpdateObserver.class).newInstance();
+ } else {
+ this.updateObserver = new NullUpdateObserver<>();
+ }
+ this.bufferSize = opts.getBufferSize();
+ }
+
+ private V deserVal(Bytes val) {
+ return serializer.deserialize(val.toArray(), valType);
+ }
+
+ private Bytes getKeyFromUpdateRow(Bytes prefix, Bytes row) {
+ return row.subSequence(prefix.length(), row.length() - 8);
+ }
+
+ void process(TransactionBase tx, Bytes ntfyRow, Column col) throws Exception {
+
+ Bytes nextKey = tx.get(ntfyRow, NEXT_COL);
+
+ ScannerConfiguration sc = new ScannerConfiguration();
+
+ if (nextKey != null) {
+ Bytes startRow =
+ Bytes.newBuilder(ntfyRow.length() + nextKey.length()).append(ntfyRow).append(nextKey)
+ .toBytes();
+ Span tmpSpan = Span.prefix(ntfyRow);
+ Span nextSpan =
+ new Span(new RowColumn(startRow, UPDATE_COL), false, tmpSpan.getEnd(),
+ tmpSpan.isEndInclusive());
+ sc.setSpan(nextSpan);
+ } else {
+ sc.setSpan(Span.prefix(ntfyRow));
+ }
+
+ sc.setSpan(Span.prefix(ntfyRow));
+ sc.fetchColumn(UPDATE_COL.getFamily(), UPDATE_COL.getQualifier());
+ RowIterator iter = tx.get(sc);
+
+ Map<Bytes, List<Bytes>> updates = new HashMap<>();
+
+ long approxMemUsed = 0;
+
+ Bytes partiallyReadKey = null;
+
+ if (iter.hasNext()) {
+ Bytes lastKey = null;
+ while (iter.hasNext() && approxMemUsed < bufferSize) {
+ Entry<Bytes, ColumnIterator> rowCol = iter.next();
+ Bytes curRow = rowCol.getKey();
+
+ tx.delete(curRow, UPDATE_COL);
+
+ Bytes serializedKey = getKeyFromUpdateRow(ntfyRow, curRow);
+ lastKey = serializedKey;
+
+ List<Bytes> updateList = updates.get(serializedKey);
+ if (updateList == null) {
+ updateList = new ArrayList<>();
+ updates.put(serializedKey, updateList);
+ }
+
+ Bytes val = rowCol.getValue().next().getValue();
+ updateList.add(val);
+
+ approxMemUsed += curRow.length();
+ approxMemUsed += val.length();
+ }
+
+ if (iter.hasNext()) {
+ Entry<Bytes, ColumnIterator> rowCol = iter.next();
+ Bytes curRow = rowCol.getKey();
+
+ // check if more updates for last key
+ if (getKeyFromUpdateRow(ntfyRow, curRow).equals(lastKey)) {
+ // there are still more updates for this key
+ partiallyReadKey = lastKey;
+
+ // start next time at the current key
+ tx.set(ntfyRow, NEXT_COL, partiallyReadKey);
+ } else {
+ // start next time at the next possible key
+ Bytes nextPossible =
+ Bytes.newBuilder(lastKey.length() + 1).append(lastKey).append(new byte[] {0})
+ .toBytes();
+ tx.set(ntfyRow, NEXT_COL, nextPossible);
+ }
+
+ // may not read all data because of mem limit, so notify self
+ tx.setWeakNotification(ntfyRow, col);
+ } else if (nextKey != null) {
+ // clear nextKey
+ tx.delete(ntfyRow, NEXT_COL);
+ }
+ } else if (nextKey != null) {
+ tx.delete(ntfyRow, NEXT_COL);
+ }
+
+ byte[] dataPrefix = ntfyRow.toArray();
+ // TODO this is awful... no sanity check... hard to read
+ dataPrefix[Bytes.of(mapId).length() + 1] = 'd';
+
+ BytesBuilder rowBuilder = Bytes.newBuilder();
+ rowBuilder.append(dataPrefix);
+ int rowPrefixLen = rowBuilder.getLength();
+
+ Set<Bytes> keysToFetch = updates.keySet();
+ if (partiallyReadKey != null) {
+ final Bytes prk = partiallyReadKey;
+ keysToFetch = Sets.filter(keysToFetch, b -> !b.equals(prk));
+ }
+ Map<Bytes, Map<Column, Bytes>> currentVals = getCurrentValues(tx, rowBuilder, keysToFetch);
+
+ ArrayList<Update<K, V>> updatesToReport = new ArrayList<>(updates.size());
+
+ for (Entry<Bytes, List<Bytes>> entry : updates.entrySet()) {
+ rowBuilder.setLength(rowPrefixLen);
+ Bytes currentValueRow = rowBuilder.append(entry.getKey()).toBytes();
+ Bytes currVal =
+ currentVals.getOrDefault(currentValueRow, Collections.emptyMap()).get(DATA_COLUMN);
+
+ Iterator<V> ui = Iterators.transform(entry.getValue().iterator(), this::deserVal);
+
+ K kd = serializer.deserialize(entry.getKey().toArray(), keyType);
+
+ if (partiallyReadKey != null && partiallyReadKey.equals(entry.getKey())) {
+ // not all updates were read for this key, so requeue the combined updates as an update
+ Optional<V> nv = combiner.combine(kd, ui);
+ if (nv.isPresent()) {
+ update(tx, Collections.singletonMap(kd, nv.get()));
+ }
+ } else {
+ Optional<V> nv = combiner.combine(kd, concat(ui, currVal));
+ Bytes newVal = nv.isPresent() ? Bytes.of(serializer.serialize(nv.get())) : null;
+ if (newVal != null ^ currVal != null || (currVal != null && !currVal.equals(newVal))) {
+ if (newVal == null) {
+ tx.delete(currentValueRow, DATA_COLUMN);
+ } else {
+ tx.set(currentValueRow, DATA_COLUMN, newVal);
+ }
+
+ Optional<V> cvd = Optional.ofNullable(currVal).map(this::deserVal);
+ updatesToReport.add(new Update<>(kd, cvd, nv));
+ }
+ }
+ }
+
+ // TODO could clear these as converted to objects to avoid double memory usage
+ updates.clear();
+ currentVals.clear();
+
+ if (updatesToReport.size() > 0) {
+ updateObserver.updatingValues(tx, updatesToReport.iterator());
+ }
+ }
+
+ private static final Column DATA_COLUMN = new Column("data", "current");
+
+ private Map<Bytes, Map<Column, Bytes>> getCurrentValues(TransactionBase tx, BytesBuilder prefix,
+ Set<Bytes> keySet) {
+
+ Set<Bytes> rows = new HashSet<>();
+
+ int prefixLen = prefix.getLength();
+ for (Bytes key : keySet) {
+ prefix.setLength(prefixLen);
+ rows.add(prefix.append(key).toBytes());
+ }
+
+ try {
+ return tx.get(rows, Collections.singleton(DATA_COLUMN));
+ } catch (IllegalArgumentException e) {
+ System.out.println(rows.size());
+ throw e;
+ }
+ }
+
+ private Iterator<V> concat(Iterator<V> updates, Bytes currentVal) {
+ if (currentVal == null) {
+ return updates;
+ }
+
+ return Iterators.concat(updates, Iterators.singletonIterator(deserVal(currentVal)));
+ }
+
+ /**
+ * This method will retrieve the current value for key and any outstanding updates and combine
+ * them using the configured {@link Combiner}. The result from the combiner is returned.
+ */
+ public V get(SnapshotBase tx, K key) {
+
+ byte[] k = serializer.serialize(key);
+
+ int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+ String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+
+
+ BytesBuilder rowBuilder = Bytes.newBuilder();
+ rowBuilder.append(mapId).append(":u:").append(bucketId).append(":").append(k);
+
+ ScannerConfiguration sc = new ScannerConfiguration();
+ sc.setSpan(Span.prefix(rowBuilder.toBytes()));
+
+ RowIterator iter = tx.get(sc);
+
+ Iterator<V> ui;
+
+ if (iter.hasNext()) {
+ ui = Iterators.transform(iter, e -> deserVal(e.getValue().next().getValue()));
+ } else {
+ ui = Collections.<V>emptyList().iterator();
+ }
+
+ rowBuilder.setLength(mapId.length());
+ rowBuilder.append(":d:").append(bucketId).append(":").append(k);
+
+ Bytes dataRow = rowBuilder.toBytes();
+
+ Bytes cv = tx.get(dataRow, DATA_COLUMN);
+
+ if (!ui.hasNext()) {
+ if (cv == null) {
+ return null;
+ } else {
+ return deserVal(cv);
+ }
+ }
+
+ return combiner.combine(key, concat(ui, cv)).orElse(null);
+ }
+
+ String getId() {
+ return mapId;
+ }
+
+ /**
+ * Queues updates for a collision free map. These updates will be made by an Observer executing
+ * another transaction. This method will not collide with other transaction queuing updates for
+ * the same keys.
+ *
+ * @param tx This transaction will be used to make the updates.
+ * @param updates The keys in the map should correspond to keys in the collision free map being
+ * updated. The values in the map will be queued for updating.
+ */
+ public void update(TransactionBase tx, Map<K, V> updates) {
+ Preconditions.checkState(numBuckets > 0, "Not initialized");
+
+ Set<String> buckets = new HashSet<>();
+
+ BytesBuilder rowBuilder = Bytes.newBuilder();
+ rowBuilder.append(mapId).append(":u:");
+ int prefixLength = rowBuilder.getLength();
+
+ byte[] startTs = encSeq(tx.getStartTimestamp());
+
+ for (Entry<K, V> entry : updates.entrySet()) {
+ byte[] k = serializer.serialize(entry.getKey());
+ int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+ String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+
+ // reset to the common row prefix
+ rowBuilder.setLength(prefixLength);
+
+ Bytes row = rowBuilder.append(bucketId).append(":").append(k).append(startTs).toBytes();
+ Bytes val = Bytes.of(serializer.serialize(entry.getValue()));
+
+ // TODO set if not exists would be comforting here.... but
+ // collisions on bucketId+key+uuid should never occur
+ tx.set(row, UPDATE_COL, val);
+
+ buckets.add(bucketId);
+ }
+
+ for (String bucketId : buckets) {
+ rowBuilder.setLength(prefixLength);
+ rowBuilder.append(bucketId).append(":");
+
+ Bytes row = rowBuilder.toBytes();
+
+ tx.setWeakNotification(row, new Column("fluoRecipes", "cfm:" + mapId));
+ }
+ }
+
+ public static <K2, V2> CollisionFreeMap<K2, V2> getInstance(String mapId,
+ SimpleConfiguration appConf) {
+ Options opts = new Options(mapId, appConf);
+ try {
+ return new CollisionFreeMap<>(opts, SimpleSerializer.getInstance(appConf));
+ } catch (Exception e) {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * A @link {@link CollisionFreeMap} stores data in its own data format in the Fluo table. When
+ * initializing a Fluo table with something like Map Reduce or Spark, data will need to be written
+ * in this format. Thats the purpose of this method, it provide a simple class that can do this
+ * conversion.
+ *
+ */
+ public static <K2, V2> Initializer<K2, V2> getInitializer(String mapId, int numBuckets,
+ SimpleSerializer serializer) {
+ return new Initializer<>(mapId, numBuckets, serializer);
+ }
+
+ /**
+ * @see CollisionFreeMap#getInitializer(String, int, SimpleSerializer)
+ */
+ public static class Initializer<K2, V2> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private String mapId;
+
+ private SimpleSerializer serializer;
+
+ private int numBuckets = -1;
+
+ private Initializer(String mapId, int numBuckets, SimpleSerializer serializer) {
+ this.mapId = mapId;
+ this.numBuckets = numBuckets;
+ this.serializer = serializer;
+ }
+
+ public RowColumnValue convert(K2 key, V2 val) {
+ byte[] k = serializer.serialize(key);
+ int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+ String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+
+ BytesBuilder bb = Bytes.newBuilder();
+ Bytes row = bb.append(mapId).append(":d:").append(bucketId).append(":").append(k).toBytes();
+ byte[] v = serializer.serialize(val);
+
+ return new RowColumnValue(row, DATA_COLUMN, Bytes.of(v));
+ }
+ }
+
+ public static class Options {
+
+ static final long DEFAULT_BUFFER_SIZE = 1 << 22;
+ static final int DEFAULT_BUCKETS_PER_TABLET = 10;
+
+ int numBuckets;
+ Integer bucketsPerTablet = null;
+
+ Long bufferSize;
+
+ String keyType;
+ String valueType;
+ String combinerType;
+ String updateObserverType;
+ String mapId;
+
+ private static final String PREFIX = "recipes.cfm.";
+
+ Options(String mapId, SimpleConfiguration appConfig) {
+ this.mapId = mapId;
+
+ this.numBuckets = appConfig.getInt(PREFIX + mapId + ".buckets");
+ this.combinerType = appConfig.getString(PREFIX + mapId + ".combiner");
+ this.keyType = appConfig.getString(PREFIX + mapId + ".key");
+ this.valueType = appConfig.getString(PREFIX + mapId + ".val");
+ this.updateObserverType = appConfig.getString(PREFIX + mapId + ".updateObserver", null);
+ this.bufferSize = appConfig.getLong(PREFIX + mapId + ".bufferSize", DEFAULT_BUFFER_SIZE);
+ this.bucketsPerTablet =
+ appConfig.getInt(PREFIX + mapId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET);
+ }
+
+ public Options(String mapId, String combinerType, String keyType, String valType, int buckets) {
+ Preconditions.checkArgument(buckets > 0);
+ Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'");
+
+ this.mapId = mapId;
+ this.numBuckets = buckets;
+ this.combinerType = combinerType;
+ this.updateObserverType = null;
+ this.keyType = keyType;
+ this.valueType = valType;
+ }
+
+ public Options(String mapId, String combinerType, String updateObserverType, String keyType,
+ String valueType, int buckets) {
+ Preconditions.checkArgument(buckets > 0);
+ Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'");
+
+ this.mapId = mapId;
+ this.numBuckets = buckets;
+ this.combinerType = combinerType;
+ this.updateObserverType = updateObserverType;
+ this.keyType = keyType;
+ this.valueType = valueType;
+ }
+
+ /**
+ * Sets a limit on the amount of serialized updates to read into memory. Additional memory will
+ * be used to actually deserialize and process the updates. This limit does not account for
+ * object overhead in java, which can be significant.
+ *
+ * <p>
+ * The way memory read is calculated is by summing the length of serialized key and value byte
+ * arrays. Once this sum exceeds the configured memory limit, no more update key values are
+ * processed in the current transaction. When not everything is processed, the observer
+ * processing updates will notify itself causing another transaction to continue processing
+ * later
+ */
+ public Options setBufferSize(long bufferSize) {
+ Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive");
+ this.bufferSize = bufferSize;
+ return this;
+ }
+
+ long getBufferSize() {
+ if (bufferSize == null) {
+ return DEFAULT_BUFFER_SIZE;
+ }
+
+ return bufferSize;
+ }
+
+ /**
+ * Sets the number of buckets per tablet to generate. This affects how many split points will be
+ * generated when optimizing the Accumulo table.
+ *
+ */
+ public Options setBucketsPerTablet(int bucketsPerTablet) {
+ Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : "
+ + bucketsPerTablet);
+ this.bucketsPerTablet = bucketsPerTablet;
+ return this;
+ }
+
+ int getBucketsPerTablet() {
+ if (bucketsPerTablet == null) {
+ return DEFAULT_BUCKETS_PER_TABLET;
+ }
+
+ return bucketsPerTablet;
+ }
+
+ public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner, Class<K> keyType,
+ Class<V> valueType, int buckets) {
+ this(mapId, combiner.getName(), keyType.getName(), valueType.getName(), buckets);
+ }
+
+ public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner,
+ Class<? extends UpdateObserver<K, V>> updateObserver, Class<K> keyType, Class<V> valueType,
+ int buckets) {
+ this(mapId, combiner.getName(), updateObserver.getName(), keyType.getName(), valueType
+ .getName(), buckets);
+ }
+
+ void save(SimpleConfiguration appConfig) {
+ appConfig.setProperty(PREFIX + mapId + ".buckets", numBuckets + "");
+ appConfig.setProperty(PREFIX + mapId + ".combiner", combinerType + "");
+ appConfig.setProperty(PREFIX + mapId + ".key", keyType);
+ appConfig.setProperty(PREFIX + mapId + ".val", valueType);
+ if (updateObserverType != null) {
+ appConfig.setProperty(PREFIX + mapId + ".updateObserver", updateObserverType + "");
+ }
+ if (bufferSize != null) {
+ appConfig.setProperty(PREFIX + mapId + ".bufferSize", bufferSize);
+ }
+ if (bucketsPerTablet != null) {
+ appConfig.setProperty(PREFIX + mapId + ".bucketsPerTablet", bucketsPerTablet);
+ }
+ }
+ }
+
+ /**
+ * This method configures a collision free map for use. It must be called before initializing
+ * Fluo.
+ */
+ public static void configure(FluoConfiguration fluoConfig, Options opts) {
+ opts.save(fluoConfig.getAppConfiguration());
+ fluoConfig.addObserver(new ObserverConfiguration(CollisionFreeMapObserver.class.getName())
+ .setParameters(ImmutableMap.of("mapId", opts.mapId)));
+
+ Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END);
+ Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END);
+
+ new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("cfm." + opts.mapId,
+ new RowRange(dataRangeEnd, updateRangeEnd));
+ }
+
+ /**
+ * Return suggested Fluo table optimizations for all previously configured collision free maps.
+ *
+ * @param appConfig Must pass in the application configuration obtained from
+ * {@code FluoClient.getAppConfiguration()} or
+ * {@code FluoConfiguration.getAppConfiguration()}
+ */
- public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) {
++ public static TableOptimizations getTableOptimizations(SimpleConfiguration appConfig) {
+ HashSet<String> mapIds = new HashSet<>();
+ appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining(
+ k -> mapIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0]));
+
- Pirtos pirtos = new Pirtos();
- mapIds.forEach(mid -> pirtos.merge(getTableOptimizations(mid, appConfig)));
++ TableOptimizations tableOptim = new TableOptimizations();
++ mapIds.forEach(mid -> tableOptim.merge(getTableOptimizations(mid, appConfig)));
+
- return pirtos;
++ return tableOptim;
+ }
+
+ /**
+ * Return suggested Fluo table optimizations for the specified collisiong free map.
+ *
+ * @param appConfig Must pass in the application configuration obtained from
+ * {@code FluoClient.getAppConfiguration()} or
+ * {@code FluoConfiguration.getAppConfiguration()}
+ */
- public static Pirtos getTableOptimizations(String mapId, SimpleConfiguration appConfig) {
++ public static TableOptimizations getTableOptimizations(String mapId, SimpleConfiguration appConfig) {
+ Options opts = new Options(mapId, appConfig);
+
+ BytesBuilder rowBuilder = Bytes.newBuilder();
+ rowBuilder.append(mapId);
+
+ List<Bytes> dataSplits = new ArrayList<>();
+ for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
+ String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
+ rowBuilder.setLength(mapId.length());
+ dataSplits.add(rowBuilder.append(":d:").append(bucketId).toBytes());
+ }
+ Collections.sort(dataSplits);
+
+ List<Bytes> updateSplits = new ArrayList<>();
+ for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
+ String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
+ rowBuilder.setLength(mapId.length());
+ updateSplits.add(rowBuilder.append(":u:").append(bucketId).toBytes());
+ }
+ Collections.sort(updateSplits);
+
+ Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END);
+ Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END);
+
+ List<Bytes> splits = new ArrayList<>();
+ splits.add(dataRangeEnd);
+ splits.add(updateRangeEnd);
+ splits.addAll(dataSplits);
+ splits.addAll(updateSplits);
+
- Pirtos pirtos = new Pirtos();
- pirtos.setSplits(splits);
++ TableOptimizations tableOptim = new TableOptimizations();
++ tableOptim.setSplits(splits);
+
- pirtos.setTabletGroupingRegex(Pattern.quote(mapId + ":") + "[du]:");
++ tableOptim.setTabletGroupingRegex(Pattern.quote(mapId + ":") + "[du]:");
+
- return pirtos;
++ return tableOptim;
+ }
+
+ private static byte[] encSeq(long l) {
+ byte[] ret = new byte[8];
+ ret[0] = (byte) (l >>> 56);
+ ret[1] = (byte) (l >>> 48);
+ ret[2] = (byte) (l >>> 40);
+ ret[3] = (byte) (l >>> 32);
+ ret[4] = (byte) (l >>> 24);
+ ret[5] = (byte) (l >>> 16);
+ ret[6] = (byte) (l >>> 8);
+ ret[7] = (byte) (l >>> 0);
+ return ret;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
index 0000000,5a1f5fe..8ceda12
mode 000000,100644..100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/common/TestGrouping.java
@@@ -1,0 -1,92 +1,93 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+ package org.apache.fluo.recipes.core.common;
+
+ import java.util.Set;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
+
+ import com.google.common.collect.ImmutableSet;
+ import org.apache.fluo.api.config.FluoConfiguration;
+ import org.apache.fluo.api.data.Bytes;
+ import org.apache.fluo.recipes.core.export.ExportQueue;
+ import org.apache.fluo.recipes.core.map.CollisionFreeMap;
+ import org.apache.fluo.recipes.core.map.CollisionFreeMap.Options;
+ import org.junit.Assert;
+ import org.junit.Test;
+
+ public class TestGrouping {
+ @Test
+ public void testTabletGrouping() {
+ FluoConfiguration conf = new FluoConfiguration();
+
+ CollisionFreeMap.configure(conf, new Options("m1", "ct", "kt", "vt", 119));
+ CollisionFreeMap.configure(conf, new Options("m2", "ct", "kt", "vt", 3));
+
+ ExportQueue.configure(conf, new ExportQueue.Options("eq1", "et", "kt", "vt", 7));
+ ExportQueue.configure(conf, new ExportQueue.Options("eq2", "et", "kt", "vt", 3));
+
- Pirtos pirtos = CollisionFreeMap.getTableOptimizations(conf.getAppConfiguration());
- pirtos.merge(ExportQueue.getTableOptimizations(conf.getAppConfiguration()));
++ TableOptimizations tableOptim =
++ CollisionFreeMap.getTableOptimizations(conf.getAppConfiguration());
++ tableOptim.merge(ExportQueue.getTableOptimizations(conf.getAppConfiguration()));
+
- Pattern pattern = Pattern.compile(pirtos.getTabletGroupingRegex());
++ Pattern pattern = Pattern.compile(tableOptim.getTabletGroupingRegex());
+
+ Assert.assertEquals("m1:u:", group(pattern, "m1:u:f0c"));
+ Assert.assertEquals("m1:d:", group(pattern, "m1:d:f0c"));
+ Assert.assertEquals("m2:u:", group(pattern, "m2:u:abc"));
+ Assert.assertEquals("m2:d:", group(pattern, "m2:d:590"));
+ Assert.assertEquals("none", group(pattern, "m3:d:590"));
+
+ Assert.assertEquals("eq1:", group(pattern, "eq1:f0c"));
+ Assert.assertEquals("eq2:", group(pattern, "eq2:f0c"));
+ Assert.assertEquals("none", group(pattern, "eq3:f0c"));
+
+ // validate the assumptions this test is making
- Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq1#")));
- Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq2#")));
- Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq1:~")));
- Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("eq2:~")));
- Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m1:u:~")));
- Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m1:d:~")));
- Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m2:u:~")));
- Assert.assertTrue(pirtos.getSplits().contains(Bytes.of("m2:d:~")));
++ Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("eq1#")));
++ Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("eq2#")));
++ Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("eq1:~")));
++ Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("eq2:~")));
++ Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("m1:u:~")));
++ Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("m1:d:~")));
++ Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("m2:u:~")));
++ Assert.assertTrue(tableOptim.getSplits().contains(Bytes.of("m2:d:~")));
+
+ Set<String> expectedGroups =
+ ImmutableSet.of("m1:u:", "m1:d:", "m2:u:", "m2:d:", "eq1:", "eq2:");
+
+ // ensure all splits group as expected
- for (Bytes split : pirtos.getSplits()) {
++ for (Bytes split : tableOptim.getSplits()) {
+ String g = group(pattern, split.toString());
+
+ if (expectedGroups.contains(g)) {
+ Assert.assertTrue(split.toString().startsWith(g));
+ } else {
+ Assert.assertEquals("none", g);
+ Assert.assertTrue(split.toString().equals("eq1#") || split.toString().equals("eq2#"));
+ }
+
+ }
+
+ }
+
+ private String group(Pattern pattern, String endRow) {
+ Matcher m = pattern.matcher(endRow);
+ if (m.matches() && m.groupCount() == 1) {
+ return m.group(1);
+ }
+ return "none";
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
index 0000000,a359598..8259469
mode 000000,100644..100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java
@@@ -1,0 -1,75 +1,76 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+ package org.apache.fluo.recipes.core.map;
+
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.List;
+
+ import com.google.common.collect.Lists;
+ import org.apache.fluo.api.config.FluoConfiguration;
+ import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
+ import org.apache.fluo.recipes.core.map.CollisionFreeMap.Options;
+ import org.junit.Assert;
+ import org.junit.Test;
+
+ public class SplitsTest {
+ private static List<Bytes> sort(List<Bytes> in) {
+ ArrayList<Bytes> out = new ArrayList<>(in);
+ Collections.sort(out);
+ return out;
+ }
+
+ @Test
+ public void testSplits() {
+
+ Options opts = new Options("foo", WordCountCombiner.class, String.class, Long.class, 3);
+ opts.setBucketsPerTablet(1);
+ FluoConfiguration fluoConfig = new FluoConfiguration();
+ CollisionFreeMap.configure(fluoConfig, opts);
+
- Pirtos pirtos1 =
++ TableOptimizations tableOptim1 =
+ CollisionFreeMap.getTableOptimizations("foo", fluoConfig.getAppConfiguration());
+ List<Bytes> expected1 =
+ Lists.transform(
+ Arrays.asList("foo:d:1", "foo:d:2", "foo:d:~", "foo:u:1", "foo:u:2", "foo:u:~"),
+ Bytes::of);
+
- Assert.assertEquals(expected1, sort(pirtos1.getSplits()));
++ Assert.assertEquals(expected1, sort(tableOptim1.getSplits()));
+
+ Options opts2 = new Options("bar", WordCountCombiner.class, String.class, Long.class, 6);
+ opts2.setBucketsPerTablet(2);
+ CollisionFreeMap.configure(fluoConfig, opts2);
+
- Pirtos pirtos2 =
++ TableOptimizations tableOptim2 =
+ CollisionFreeMap.getTableOptimizations("bar", fluoConfig.getAppConfiguration());
+ List<Bytes> expected2 =
+ Lists.transform(
+ Arrays.asList("bar:d:2", "bar:d:4", "bar:d:~", "bar:u:2", "bar:u:4", "bar:u:~"),
+ Bytes::of);
- Assert.assertEquals(expected2, sort(pirtos2.getSplits()));
++ Assert.assertEquals(expected2, sort(tableOptim2.getSplits()));
+
- Pirtos pirtos3 = CollisionFreeMap.getTableOptimizations(fluoConfig.getAppConfiguration());
++ TableOptimizations tableOptim3 =
++ CollisionFreeMap.getTableOptimizations(fluoConfig.getAppConfiguration());
+
+ ArrayList<Bytes> expected3 = new ArrayList<>(expected2);
+ expected3.addAll(expected1);
+
- Assert.assertEquals(expected3, sort(pirtos3.getSplits()));
++ Assert.assertEquals(expected3, sort(tableOptim3.getSplits()));
+
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/22354d0f/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java
----------------------------------------------------------------------
diff --cc modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java
index 07cff67,c1adc3b..00795f4
--- a/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java
+++ b/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java
@@@ -31,7 -31,7 +31,7 @@@ import org.apache.fluo.api.client.FluoF
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.mini.MiniFluo;
import org.apache.fluo.recipes.accumulo.ops.TableOperations;
- import org.apache.fluo.recipes.common.TableOptimizations;
-import org.apache.fluo.recipes.core.common.Pirtos;
++import org.apache.fluo.recipes.core.common.TableOptimizations;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;