You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/07/15 22:07:54 UTC
[06/10] incubator-fluo-recipes git commit: Updated package names in
core module
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
new file mode 100644
index 0000000..61fec47
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.BytesBuilder;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.recipes.core.common.Pirtos;
+import org.apache.fluo.recipes.core.common.RowRange;
+import org.apache.fluo.recipes.core.common.TransientRegistry;
+import org.apache.fluo.recipes.core.impl.BucketUtil;
+import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
+
+/**
+ * See the project level documentation for information about this recipe.
+ */
+public class CollisionFreeMap<K, V> {
+
+ private static final String UPDATE_RANGE_END = ":u:~";
+
+ private static final String DATA_RANGE_END = ":d:~";
+
+ private String mapId;
+
+ private Class<K> keyType;
+ private Class<V> valType;
+ private SimpleSerializer serializer;
+ private Combiner<K, V> combiner;
+ UpdateObserver<K, V> updateObserver;
+ private long bufferSize;
+
+ static final Column UPDATE_COL = new Column("u", "v");
+ static final Column NEXT_COL = new Column("u", "next");
+
+ private int numBuckets = -1;
+
+ @SuppressWarnings("unchecked")
+ CollisionFreeMap(Options opts, SimpleSerializer serializer) throws Exception {
+
+ this.mapId = opts.mapId;
+ // TODO defer loading classes
+ // TODO centralize class loading
+ // TODO try to check type params
+ this.numBuckets = opts.numBuckets;
+ this.keyType = (Class<K>) getClass().getClassLoader().loadClass(opts.keyType);
+ this.valType = (Class<V>) getClass().getClassLoader().loadClass(opts.valueType);
+ this.combiner =
+ (Combiner<K, V>) getClass().getClassLoader().loadClass(opts.combinerType).newInstance();
+ this.serializer = serializer;
+ if (opts.updateObserverType != null) {
+ this.updateObserver =
+ getClass().getClassLoader().loadClass(opts.updateObserverType)
+ .asSubclass(UpdateObserver.class).newInstance();
+ } else {
+ this.updateObserver = new NullUpdateObserver<>();
+ }
+ this.bufferSize = opts.getBufferSize();
+ }
+
+ private V deserVal(Bytes val) {
+ return serializer.deserialize(val.toArray(), valType);
+ }
+
+ private Bytes getKeyFromUpdateRow(Bytes prefix, Bytes row) {
+ return row.subSequence(prefix.length(), row.length() - 8);
+ }
+
+ void process(TransactionBase tx, Bytes ntfyRow, Column col) throws Exception {
+
+ Bytes nextKey = tx.get(ntfyRow, NEXT_COL);
+
+ ScannerConfiguration sc = new ScannerConfiguration();
+
+ if (nextKey != null) {
+ Bytes startRow =
+ Bytes.newBuilder(ntfyRow.length() + nextKey.length()).append(ntfyRow).append(nextKey)
+ .toBytes();
+ Span tmpSpan = Span.prefix(ntfyRow);
+ Span nextSpan =
+ new Span(new RowColumn(startRow, UPDATE_COL), false, tmpSpan.getEnd(),
+ tmpSpan.isEndInclusive());
+ sc.setSpan(nextSpan);
+ } else {
+ sc.setSpan(Span.prefix(ntfyRow));
+ }
+
+ sc.setSpan(Span.prefix(ntfyRow));
+ sc.fetchColumn(UPDATE_COL.getFamily(), UPDATE_COL.getQualifier());
+ RowIterator iter = tx.get(sc);
+
+ Map<Bytes, List<Bytes>> updates = new HashMap<>();
+
+ long approxMemUsed = 0;
+
+ Bytes partiallyReadKey = null;
+
+ if (iter.hasNext()) {
+ Bytes lastKey = null;
+ while (iter.hasNext() && approxMemUsed < bufferSize) {
+ Entry<Bytes, ColumnIterator> rowCol = iter.next();
+ Bytes curRow = rowCol.getKey();
+
+ tx.delete(curRow, UPDATE_COL);
+
+ Bytes serializedKey = getKeyFromUpdateRow(ntfyRow, curRow);
+ lastKey = serializedKey;
+
+ List<Bytes> updateList = updates.get(serializedKey);
+ if (updateList == null) {
+ updateList = new ArrayList<>();
+ updates.put(serializedKey, updateList);
+ }
+
+ Bytes val = rowCol.getValue().next().getValue();
+ updateList.add(val);
+
+ approxMemUsed += curRow.length();
+ approxMemUsed += val.length();
+ }
+
+ if (iter.hasNext()) {
+ Entry<Bytes, ColumnIterator> rowCol = iter.next();
+ Bytes curRow = rowCol.getKey();
+
+ // check if more updates for last key
+ if (getKeyFromUpdateRow(ntfyRow, curRow).equals(lastKey)) {
+ // there are still more updates for this key
+ partiallyReadKey = lastKey;
+
+ // start next time at the current key
+ tx.set(ntfyRow, NEXT_COL, partiallyReadKey);
+ } else {
+ // start next time at the next possible key
+ Bytes nextPossible =
+ Bytes.newBuilder(lastKey.length() + 1).append(lastKey).append(new byte[] {0})
+ .toBytes();
+ tx.set(ntfyRow, NEXT_COL, nextPossible);
+ }
+
+ // may not read all data because of mem limit, so notify self
+ tx.setWeakNotification(ntfyRow, col);
+ } else if (nextKey != null) {
+ // clear nextKey
+ tx.delete(ntfyRow, NEXT_COL);
+ }
+ } else if (nextKey != null) {
+ tx.delete(ntfyRow, NEXT_COL);
+ }
+
+ byte[] dataPrefix = ntfyRow.toArray();
+ // TODO this is awful... no sanity check... hard to read
+ dataPrefix[Bytes.of(mapId).length() + 1] = 'd';
+
+ BytesBuilder rowBuilder = Bytes.newBuilder();
+ rowBuilder.append(dataPrefix);
+ int rowPrefixLen = rowBuilder.getLength();
+
+ Set<Bytes> keysToFetch = updates.keySet();
+ if (partiallyReadKey != null) {
+ final Bytes prk = partiallyReadKey;
+ keysToFetch = Sets.filter(keysToFetch, b -> !b.equals(prk));
+ }
+ Map<Bytes, Map<Column, Bytes>> currentVals = getCurrentValues(tx, rowBuilder, keysToFetch);
+
+ ArrayList<Update<K, V>> updatesToReport = new ArrayList<>(updates.size());
+
+ for (Entry<Bytes, List<Bytes>> entry : updates.entrySet()) {
+ rowBuilder.setLength(rowPrefixLen);
+ Bytes currentValueRow = rowBuilder.append(entry.getKey()).toBytes();
+ Bytes currVal =
+ currentVals.getOrDefault(currentValueRow, Collections.emptyMap()).get(DATA_COLUMN);
+
+ Iterator<V> ui = Iterators.transform(entry.getValue().iterator(), this::deserVal);
+
+ K kd = serializer.deserialize(entry.getKey().toArray(), keyType);
+
+ if (partiallyReadKey != null && partiallyReadKey.equals(entry.getKey())) {
+ // not all updates were read for this key, so requeue the combined updates as an update
+ Optional<V> nv = combiner.combine(kd, ui);
+ if (nv.isPresent()) {
+ update(tx, Collections.singletonMap(kd, nv.get()));
+ }
+ } else {
+ Optional<V> nv = combiner.combine(kd, concat(ui, currVal));
+ Bytes newVal = nv.isPresent() ? Bytes.of(serializer.serialize(nv.get())) : null;
+ if (newVal != null ^ currVal != null || (currVal != null && !currVal.equals(newVal))) {
+ if (newVal == null) {
+ tx.delete(currentValueRow, DATA_COLUMN);
+ } else {
+ tx.set(currentValueRow, DATA_COLUMN, newVal);
+ }
+
+ Optional<V> cvd = Optional.ofNullable(currVal).map(this::deserVal);
+ updatesToReport.add(new Update<>(kd, cvd, nv));
+ }
+ }
+ }
+
+ // TODO could clear these as converted to objects to avoid double memory usage
+ updates.clear();
+ currentVals.clear();
+
+ if (updatesToReport.size() > 0) {
+ updateObserver.updatingValues(tx, updatesToReport.iterator());
+ }
+ }
+
+ private static final Column DATA_COLUMN = new Column("data", "current");
+
+ private Map<Bytes, Map<Column, Bytes>> getCurrentValues(TransactionBase tx, BytesBuilder prefix,
+ Set<Bytes> keySet) {
+
+ Set<Bytes> rows = new HashSet<>();
+
+ int prefixLen = prefix.getLength();
+ for (Bytes key : keySet) {
+ prefix.setLength(prefixLen);
+ rows.add(prefix.append(key).toBytes());
+ }
+
+ try {
+ return tx.get(rows, Collections.singleton(DATA_COLUMN));
+ } catch (IllegalArgumentException e) {
+ System.out.println(rows.size());
+ throw e;
+ }
+ }
+
+ private Iterator<V> concat(Iterator<V> updates, Bytes currentVal) {
+ if (currentVal == null) {
+ return updates;
+ }
+
+ return Iterators.concat(updates, Iterators.singletonIterator(deserVal(currentVal)));
+ }
+
+ /**
+ * This method will retrieve the current value for key and any outstanding updates and combine
+ * them using the configured {@link Combiner}. The result from the combiner is returned.
+ */
+ public V get(SnapshotBase tx, K key) {
+
+ byte[] k = serializer.serialize(key);
+
+ int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+ String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+
+
+ BytesBuilder rowBuilder = Bytes.newBuilder();
+ rowBuilder.append(mapId).append(":u:").append(bucketId).append(":").append(k);
+
+ ScannerConfiguration sc = new ScannerConfiguration();
+ sc.setSpan(Span.prefix(rowBuilder.toBytes()));
+
+ RowIterator iter = tx.get(sc);
+
+ Iterator<V> ui;
+
+ if (iter.hasNext()) {
+ ui = Iterators.transform(iter, e -> deserVal(e.getValue().next().getValue()));
+ } else {
+ ui = Collections.<V>emptyList().iterator();
+ }
+
+ rowBuilder.setLength(mapId.length());
+ rowBuilder.append(":d:").append(bucketId).append(":").append(k);
+
+ Bytes dataRow = rowBuilder.toBytes();
+
+ Bytes cv = tx.get(dataRow, DATA_COLUMN);
+
+ if (!ui.hasNext()) {
+ if (cv == null) {
+ return null;
+ } else {
+ return deserVal(cv);
+ }
+ }
+
+ return combiner.combine(key, concat(ui, cv)).orElse(null);
+ }
+
+ String getId() {
+ return mapId;
+ }
+
+ /**
+ * Queues updates for a collision free map. These updates will be made by an Observer executing
+ * another transaction. This method will not collide with other transaction queuing updates for
+ * the same keys.
+ *
+ * @param tx This transaction will be used to make the updates.
+ * @param updates The keys in the map should correspond to keys in the collision free map being
+ * updated. The values in the map will be queued for updating.
+ */
+ public void update(TransactionBase tx, Map<K, V> updates) {
+ Preconditions.checkState(numBuckets > 0, "Not initialized");
+
+ Set<String> buckets = new HashSet<>();
+
+ BytesBuilder rowBuilder = Bytes.newBuilder();
+ rowBuilder.append(mapId).append(":u:");
+ int prefixLength = rowBuilder.getLength();
+
+ byte[] startTs = encSeq(tx.getStartTimestamp());
+
+ for (Entry<K, V> entry : updates.entrySet()) {
+ byte[] k = serializer.serialize(entry.getKey());
+ int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+ String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+
+ // reset to the common row prefix
+ rowBuilder.setLength(prefixLength);
+
+ Bytes row = rowBuilder.append(bucketId).append(":").append(k).append(startTs).toBytes();
+ Bytes val = Bytes.of(serializer.serialize(entry.getValue()));
+
+ // TODO set if not exists would be comforting here.... but
+ // collisions on bucketId+key+uuid should never occur
+ tx.set(row, UPDATE_COL, val);
+
+ buckets.add(bucketId);
+ }
+
+ for (String bucketId : buckets) {
+ rowBuilder.setLength(prefixLength);
+ rowBuilder.append(bucketId).append(":");
+
+ Bytes row = rowBuilder.toBytes();
+
+ tx.setWeakNotification(row, new Column("fluoRecipes", "cfm:" + mapId));
+ }
+ }
+
+
+ public static <K2, V2> CollisionFreeMap<K2, V2> getInstance(String mapId,
+ SimpleConfiguration appConf) {
+ Options opts = new Options(mapId, appConf);
+ try {
+ return new CollisionFreeMap<>(opts, SimpleSerializer.getInstance(appConf));
+ } catch (Exception e) {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * A @link {@link CollisionFreeMap} stores data in its own data format in the Fluo table. When
+ * initializing a Fluo table with something like Map Reduce or Spark, data will need to be written
+ * in this format. Thats the purpose of this method, it provide a simple class that can do this
+ * conversion.
+ *
+ */
+ public static <K2, V2> Initializer<K2, V2> getInitializer(String mapId, int numBuckets,
+ SimpleSerializer serializer) {
+ return new Initializer<>(mapId, numBuckets, serializer);
+ }
+
+
+ /**
+ * @see CollisionFreeMap#getInitializer(String, int, SimpleSerializer)
+ */
+ public static class Initializer<K2, V2> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private String mapId;
+
+ private SimpleSerializer serializer;
+
+ private int numBuckets = -1;
+
+ private Initializer(String mapId, int numBuckets, SimpleSerializer serializer) {
+ this.mapId = mapId;
+ this.numBuckets = numBuckets;
+ this.serializer = serializer;
+ }
+
+ public RowColumnValue convert(K2 key, V2 val) {
+ byte[] k = serializer.serialize(key);
+ int hash = Hashing.murmur3_32().hashBytes(k).asInt();
+ String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
+
+ BytesBuilder bb = Bytes.newBuilder();
+ Bytes row = bb.append(mapId).append(":d:").append(bucketId).append(":").append(k).toBytes();
+ byte[] v = serializer.serialize(val);
+
+ return new RowColumnValue(row, DATA_COLUMN, Bytes.of(v));
+ }
+ }
+
+ public static class Options {
+
+ static final long DEFAULT_BUFFER_SIZE = 1 << 22;
+ static final int DEFAULT_BUCKETS_PER_TABLET = 10;
+
+ int numBuckets;
+ Integer bucketsPerTablet = null;
+
+ Long bufferSize;
+
+ String keyType;
+ String valueType;
+ String combinerType;
+ String updateObserverType;
+ String mapId;
+
+ private static final String PREFIX = "recipes.cfm.";
+
+ Options(String mapId, SimpleConfiguration appConfig) {
+ this.mapId = mapId;
+
+ this.numBuckets = appConfig.getInt(PREFIX + mapId + ".buckets");
+ this.combinerType = appConfig.getString(PREFIX + mapId + ".combiner");
+ this.keyType = appConfig.getString(PREFIX + mapId + ".key");
+ this.valueType = appConfig.getString(PREFIX + mapId + ".val");
+ this.updateObserverType = appConfig.getString(PREFIX + mapId + ".updateObserver", null);
+ this.bufferSize = appConfig.getLong(PREFIX + mapId + ".bufferSize", DEFAULT_BUFFER_SIZE);
+ this.bucketsPerTablet =
+ appConfig.getInt(PREFIX + mapId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET);
+ }
+
+ public Options(String mapId, String combinerType, String keyType, String valType, int buckets) {
+ Preconditions.checkArgument(buckets > 0);
+ Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'");
+
+ this.mapId = mapId;
+ this.numBuckets = buckets;
+ this.combinerType = combinerType;
+ this.updateObserverType = null;
+ this.keyType = keyType;
+ this.valueType = valType;
+ }
+
+ public Options(String mapId, String combinerType, String updateObserverType, String keyType,
+ String valueType, int buckets) {
+ Preconditions.checkArgument(buckets > 0);
+ Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'");
+
+ this.mapId = mapId;
+ this.numBuckets = buckets;
+ this.combinerType = combinerType;
+ this.updateObserverType = updateObserverType;
+ this.keyType = keyType;
+ this.valueType = valueType;
+ }
+
+ /**
+ * Sets a limit on the amount of serialized updates to read into memory. Additional memory will
+ * be used to actually deserialize and process the updates. This limit does not account for
+ * object overhead in java, which can be significant.
+ *
+ * <p>
+ * The way memory read is calculated is by summing the length of serialized key and value byte
+ * arrays. Once this sum exceeds the configured memory limit, no more update key values are
+ * processed in the current transaction. When not everything is processed, the observer
+ * processing updates will notify itself causing another transaction to continue processing
+ * later
+ */
+ public Options setBufferSize(long bufferSize) {
+ Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive");
+ this.bufferSize = bufferSize;
+ return this;
+ }
+
+ long getBufferSize() {
+ if (bufferSize == null) {
+ return DEFAULT_BUFFER_SIZE;
+ }
+
+ return bufferSize;
+ }
+
+ /**
+ * Sets the number of buckets per tablet to generate. This affects how many split points will be
+ * generated when optimizing the Accumulo table.
+ *
+ */
+ public Options setBucketsPerTablet(int bucketsPerTablet) {
+ Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : "
+ + bucketsPerTablet);
+ this.bucketsPerTablet = bucketsPerTablet;
+ return this;
+ }
+
+ int getBucketsPerTablet() {
+ if (bucketsPerTablet == null) {
+ return DEFAULT_BUCKETS_PER_TABLET;
+ }
+
+ return bucketsPerTablet;
+ }
+
+ public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner, Class<K> keyType,
+ Class<V> valueType, int buckets) {
+ this(mapId, combiner.getName(), keyType.getName(), valueType.getName(), buckets);
+ }
+
+ public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner,
+ Class<? extends UpdateObserver<K, V>> updateObserver, Class<K> keyType, Class<V> valueType,
+ int buckets) {
+ this(mapId, combiner.getName(), updateObserver.getName(), keyType.getName(), valueType
+ .getName(), buckets);
+ }
+
+ void save(SimpleConfiguration appConfig) {
+ appConfig.setProperty(PREFIX + mapId + ".buckets", numBuckets + "");
+ appConfig.setProperty(PREFIX + mapId + ".combiner", combinerType + "");
+ appConfig.setProperty(PREFIX + mapId + ".key", keyType);
+ appConfig.setProperty(PREFIX + mapId + ".val", valueType);
+ if (updateObserverType != null) {
+ appConfig.setProperty(PREFIX + mapId + ".updateObserver", updateObserverType + "");
+ }
+ if (bufferSize != null) {
+ appConfig.setProperty(PREFIX + mapId + ".bufferSize", bufferSize);
+ }
+ if (bucketsPerTablet != null) {
+ appConfig.setProperty(PREFIX + mapId + ".bucketsPerTablet", bucketsPerTablet);
+ }
+ }
+ }
+
+ /**
+ * This method configures a collision free map for use. It must be called before initializing
+ * Fluo.
+ */
+ public static void configure(FluoConfiguration fluoConfig, Options opts) {
+ opts.save(fluoConfig.getAppConfiguration());
+ fluoConfig.addObserver(new ObserverConfiguration(CollisionFreeMapObserver.class.getName())
+ .setParameters(ImmutableMap.of("mapId", opts.mapId)));
+
+ Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END);
+ Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END);
+
+ new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("cfm." + opts.mapId,
+ new RowRange(dataRangeEnd, updateRangeEnd));
+ }
+
+ /**
+ * Return suggested Fluo table optimizations for all previously configured collision free maps.
+ *
+ * @param appConfig Must pass in the application configuration obtained from
+ * {@code FluoClient.getAppConfiguration()} or
+ * {@code FluoConfiguration.getAppConfiguration()}
+ */
+ public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) {
+ HashSet<String> mapIds = new HashSet<>();
+ appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining(
+ k -> mapIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0]));
+
+ Pirtos pirtos = new Pirtos();
+ mapIds.forEach(mid -> pirtos.merge(getTableOptimizations(mid, appConfig)));
+
+ return pirtos;
+ }
+
+ /**
+ * Return suggested Fluo table optimizations for the specified collisiong free map.
+ *
+ * @param appConfig Must pass in the application configuration obtained from
+ * {@code FluoClient.getAppConfiguration()} or
+ * {@code FluoConfiguration.getAppConfiguration()}
+ */
+ public static Pirtos getTableOptimizations(String mapId, SimpleConfiguration appConfig) {
+ Options opts = new Options(mapId, appConfig);
+
+ BytesBuilder rowBuilder = Bytes.newBuilder();
+ rowBuilder.append(mapId);
+
+ List<Bytes> dataSplits = new ArrayList<>();
+ for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
+ String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
+ rowBuilder.setLength(mapId.length());
+ dataSplits.add(rowBuilder.append(":d:").append(bucketId).toBytes());
+ }
+ Collections.sort(dataSplits);
+
+ List<Bytes> updateSplits = new ArrayList<>();
+ for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
+ String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
+ rowBuilder.setLength(mapId.length());
+ updateSplits.add(rowBuilder.append(":u:").append(bucketId).toBytes());
+ }
+ Collections.sort(updateSplits);
+
+ Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END);
+ Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END);
+
+ List<Bytes> splits = new ArrayList<>();
+ splits.add(dataRangeEnd);
+ splits.add(updateRangeEnd);
+ splits.addAll(dataSplits);
+ splits.addAll(updateSplits);
+
+ Pirtos pirtos = new Pirtos();
+ pirtos.setSplits(splits);
+
+ pirtos.setTabletGroupingRegex(Pattern.quote(mapId + ":") + "[du]:");
+
+ return pirtos;
+ }
+
+ private static byte[] encSeq(long l) {
+ byte[] ret = new byte[8];
+ ret[0] = (byte) (l >>> 56);
+ ret[1] = (byte) (l >>> 48);
+ ret[2] = (byte) (l >>> 40);
+ ret[3] = (byte) (l >>> 32);
+ ret[4] = (byte) (l >>> 24);
+ ret[5] = (byte) (l >>> 16);
+ ret[6] = (byte) (l >>> 8);
+ ret[7] = (byte) (l >>> 0);
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMapObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMapObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMapObserver.java
new file mode 100644
index 0000000..96890af
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMapObserver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+
+/**
+ * This class is configured for use by CollisionFreeMap.configure(FluoConfiguration,
+ * CollisionFreeMap.Options) . This class should never have to be used directly.
+ */
+
+public class CollisionFreeMapObserver extends AbstractObserver {
+
+ @SuppressWarnings("rawtypes")
+ private CollisionFreeMap cfm;
+ private String mapId;
+
+ public CollisionFreeMapObserver() {}
+
+ @Override
+ public void init(Context context) throws Exception {
+ this.mapId = context.getParameters().get("mapId");
+ cfm = CollisionFreeMap.getInstance(mapId, context.getAppConfiguration());
+ cfm.updateObserver.init(mapId, context);
+ }
+
+ @Override
+ public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
+ cfm.process(tx, row, col);
+ }
+
+ @Override
+ public ObservedColumn getObservedColumn() {
+ // TODO constants
+ return new ObservedColumn(new Column("fluoRecipes", "cfm:" + mapId), NotificationType.WEAK);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Combiner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Combiner.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Combiner.java
new file mode 100644
index 0000000..c9d468b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Combiner.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.Iterator;
+import java.util.Optional;
+
+public interface Combiner<K, V> {
+ /**
+ * This function is called to combine the current value of a key with updates that were queued for
+ * the key. See the collision free map project level documentation for more information.
+ *
+ * @return Then new value for the key. Returning Optional.absent() will cause the key to be
+ * deleted.
+ */
+
+ Optional<V> combine(K key, Iterator<V> updates);
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/map/NullUpdateObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/NullUpdateObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/NullUpdateObserver.java
new file mode 100644
index 0000000..7bbfec1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/NullUpdateObserver.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.Iterator;
+
+import org.apache.fluo.api.client.TransactionBase;
+
+class NullUpdateObserver<K, V> extends UpdateObserver<K, V> {
+ @Override
+ public void updatingValues(TransactionBase tx, Iterator<Update<K, V>> updates) {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Update.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Update.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Update.java
new file mode 100644
index 0000000..10e718e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Update.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.Optional;
+
+public class Update<K, V> {
+
+ private final K key;
+ private final Optional<V> oldValue;
+ private final Optional<V> newValue;
+
+ Update(K key, Optional<V> oldValue, Optional<V> newValue) {
+ this.key = key;
+ this.oldValue = oldValue;
+ this.newValue = newValue;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public Optional<V> getNewValue() {
+ return newValue;
+ }
+
+ public Optional<V> getOldValue() {
+ return oldValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/map/UpdateObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/UpdateObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/UpdateObserver.java
new file mode 100644
index 0000000..2e48451
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/UpdateObserver.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.map;
+
+import java.util.Iterator;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.observer.Observer.Context;
+
+/**
+ * A {@link CollisionFreeMap} calls this to allow additional processing to be done when key values
+ * are updated. See the project level documentation for more information.
+ */
+
+public abstract class UpdateObserver<K, V> {
+ public void init(String mapId, Context observerContext) throws Exception {}
+
+ public abstract void updatingValues(TransactionBase tx, Iterator<Update<K, V>> updates);
+
+ // TODO add close
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java
new file mode 100644
index 0000000..589b9cc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.serialization;
+
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+
+public interface SimpleSerializer {
+
+ /**
+ * Called immediately after construction and passed Fluo application configuration.
+ */
+ public void init(SimpleConfiguration appConfig);
+
+ // TODO refactor to support reuse of objects and byte arrays???
+ public <T> byte[] serialize(T obj);
+
+ public <T> T deserialize(byte[] serObj, Class<T> clazz);
+
+ public static void setSetserlializer(FluoConfiguration fluoConfig,
+ Class<? extends SimpleSerializer> serializerType) {
+ setSetserlializer(fluoConfig, serializerType.getName());
+ }
+
+ public static void setSetserlializer(FluoConfiguration fluoConfig, String serializerType) {
+ fluoConfig.getAppConfiguration().setProperty("recipes.serializer", serializerType);
+ }
+
+ public static SimpleSerializer getInstance(SimpleConfiguration appConfig) {
+ String serType =
+ appConfig.getString("recipes.serializer",
+ "org.apache.fluo.recipes.kryo.KryoSimplerSerializer");
+ try {
+ SimpleSerializer simplerSer =
+ SimpleSerializer.class.getClassLoader().loadClass(serType)
+ .asSubclass(SimpleSerializer.class).newInstance();
+ simplerSer.init(appConfig);
+ return simplerSer;
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/LogEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/LogEntry.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/LogEntry.java
new file mode 100644
index 0000000..fa25b63
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/LogEntry.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.transaction;
+
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+/**
+ * Logs an operation (i.e GET, SET, or DELETE) in a Transaction. Multiple LogEntry objects make up a
+ * {@link TxLog}.
+ */
+public class LogEntry {
+
+ public enum Operation {
+ GET, SET, DELETE
+ }
+
+ private Operation op;
+ private Bytes row;
+ private Column col;
+ private Bytes value;
+
+ private LogEntry() {}
+
+ private LogEntry(Operation op, Bytes row, Column col, Bytes value) {
+ Objects.requireNonNull(op);
+ Objects.requireNonNull(row);
+ Objects.requireNonNull(col);
+ Objects.requireNonNull(value);
+ this.op = op;
+ this.row = row;
+ this.col = col;
+ this.value = value;
+ }
+
+ public Operation getOp() {
+ return op;
+ }
+
+ public Bytes getRow() {
+ return row;
+ }
+
+ public Column getColumn() {
+ return col;
+ }
+
+ public Bytes getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof LogEntry) {
+ LogEntry other = (LogEntry) o;
+ return ((op == other.op) && row.equals(other.row) && col.equals(other.col) && value
+ .equals(other.value));
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = op.hashCode();
+ result = 31 * result + row.hashCode();
+ result = 31 * result + col.hashCode();
+ result = 31 * result + value.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "LogEntry{op=" + op + ", row=" + row + ", col=" + col + ", value=" + value + "}";
+ }
+
+ public static LogEntry newGet(String row, Column col, String value) {
+ return newGet(Bytes.of(row), col, Bytes.of(value));
+ }
+
+ public static LogEntry newGet(Bytes row, Column col, Bytes value) {
+ return new LogEntry(Operation.GET, row, col, value);
+ }
+
+ public static LogEntry newSet(String row, Column col, String value) {
+ return newSet(Bytes.of(row), col, Bytes.of(value));
+ }
+
+ public static LogEntry newSet(Bytes row, Column col, Bytes value) {
+ return new LogEntry(Operation.SET, row, col, value);
+ }
+
+ public static LogEntry newDelete(String row, Column col) {
+ return newDelete(Bytes.of(row), col);
+ }
+
+ public static LogEntry newDelete(Bytes row, Column col) {
+ return new LogEntry(Operation.DELETE, row, col, Bytes.EMPTY);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransaction.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransaction.java
new file mode 100644
index 0000000..8fb98d3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransaction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.transaction;
+
+import java.util.function.Predicate;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.exceptions.CommitException;
+
+/**
+ * An implementation of {@link Transaction} that logs all transactions operations (GET, SET, or
+ * DELETE) in a {@link TxLog} that can be used for exports
+ */
+public class RecordingTransaction extends RecordingTransactionBase implements Transaction {
+
+ private final Transaction tx;
+
+ private RecordingTransaction(Transaction tx) {
+ super(tx);
+ this.tx = tx;
+ }
+
+ private RecordingTransaction(Transaction tx, Predicate<LogEntry> filter) {
+ super(tx, filter);
+ this.tx = tx;
+ }
+
+ @Override
+ public void commit() throws CommitException {
+ tx.commit();
+ }
+
+ @Override
+ public void close() {
+ tx.close();
+ }
+
+ /**
+ * Creates a RecordingTransaction by wrapping an existing Transaction
+ */
+ public static RecordingTransaction wrap(Transaction tx) {
+ return new RecordingTransaction(tx);
+ }
+
+ /**
+ * Creates a RecordingTransaction using the provided LogEntry filter and existing Transaction
+ */
+ public static RecordingTransaction wrap(Transaction tx, Predicate<LogEntry> filter) {
+ return new RecordingTransaction(tx, filter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
new file mode 100644
index 0000000..6303122
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionBase.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.transaction;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.exceptions.AlreadySetException;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+
+/**
+ * An implementation of {@link TransactionBase} that logs all transactions operations (GET, SET, or
+ * DELETE) in a {@link TxLog} that can be used for exports
+ */
+public class RecordingTransactionBase implements TransactionBase {
+
+ private final TransactionBase txb;
+ private final TxLog txLog = new TxLog();
+ private final Predicate<LogEntry> filter;
+
+ RecordingTransactionBase(TransactionBase txb, Predicate<LogEntry> filter) {
+ this.txb = txb;
+ this.filter = filter;
+ }
+
+ RecordingTransactionBase(TransactionBase txb) {
+ this(txb, le -> true);
+ }
+
+ @Override
+ public void setWeakNotification(Bytes row, Column col) {
+ txb.setWeakNotification(row, col);
+ }
+
+ @Override
+ public void setWeakNotification(String row, Column col) {
+ txb.setWeakNotification(row, col);
+ }
+
+ @Override
+ public void set(Bytes row, Column col, Bytes value) throws AlreadySetException {
+ txLog.filteredAdd(LogEntry.newSet(row, col, value), filter);
+ txb.set(row, col, value);
+ }
+
+ @Override
+ public void set(String row, Column col, String value) throws AlreadySetException {
+ txLog.filteredAdd(LogEntry.newSet(row, col, value), filter);
+ txb.set(row, col, value);
+ }
+
+ @Override
+ public void delete(Bytes row, Column col) {
+ txLog.filteredAdd(LogEntry.newDelete(row, col), filter);
+ txb.delete(row, col);
+ }
+
+ @Override
+ public void delete(String row, Column col) {
+ txLog.filteredAdd(LogEntry.newDelete(row, col), filter);
+ txb.delete(row, col);
+ }
+
+ /**
+ * Logs GETs for returned Row/Columns. Requests that return no data will not be logged.
+ */
+ @Override
+ public Bytes get(Bytes row, Column col) {
+ Bytes val = txb.get(row, col);
+ if (val != null) {
+ txLog.filteredAdd(LogEntry.newGet(row, col, val), filter);
+ }
+ return val;
+ }
+
+ /**
+ * Logs GETs for returned Row/Columns. Requests that return no data will not be logged.
+ */
+ @Override
+ public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+ Map<Column, Bytes> colVal = txb.get(row, columns);
+ for (Map.Entry<Column, Bytes> entry : colVal.entrySet()) {
+ txLog.filteredAdd(LogEntry.newGet(row, entry.getKey(), entry.getValue()), filter);
+ }
+ return colVal;
+ }
+
+ /**
+ * Logs GETs for returned Row/Columns. Requests that return no data will not be logged.
+ */
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+ Map<Bytes, Map<Column, Bytes>> rowColVal = txb.get(rows, columns);
+ for (Map.Entry<Bytes, Map<Column, Bytes>> rowEntry : rowColVal.entrySet()) {
+ for (Map.Entry<Column, Bytes> colEntry : rowEntry.getValue().entrySet()) {
+ txLog.filteredAdd(
+ LogEntry.newGet(rowEntry.getKey(), colEntry.getKey(), colEntry.getValue()), filter);
+ }
+ }
+ return rowColVal;
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
+ Map<Bytes, Map<Column, Bytes>> rowColVal = txb.get(rowColumns);
+ for (Map.Entry<Bytes, Map<Column, Bytes>> rowEntry : rowColVal.entrySet()) {
+ for (Map.Entry<Column, Bytes> colEntry : rowEntry.getValue().entrySet()) {
+ txLog.filteredAdd(
+ LogEntry.newGet(rowEntry.getKey(), colEntry.getKey(), colEntry.getValue()), filter);
+ }
+ }
+ return rowColVal;
+ }
+
+ /**
+ * Logs GETs for Row/Columns returned by iterators. Requests that return no data will not be
+ * logged.
+ */
+ @Override
+ public RowIterator get(ScannerConfiguration config) {
+ final RowIterator rowIter = txb.get(config);
+ if (rowIter != null) {
+ return new RowIterator() {
+
+ @Override
+ public boolean hasNext() {
+ return rowIter.hasNext();
+ }
+
+ @Override
+ public Map.Entry<Bytes, ColumnIterator> next() {
+ final Map.Entry<Bytes, ColumnIterator> rowEntry = rowIter.next();
+ if ((rowEntry != null) && (rowEntry.getValue() != null)) {
+ final ColumnIterator colIter = rowEntry.getValue();
+ return new AbstractMap.SimpleEntry<>(rowEntry.getKey(), new ColumnIterator() {
+
+ @Override
+ public boolean hasNext() {
+ return colIter.hasNext();
+ }
+
+ @Override
+ public Map.Entry<Column, Bytes> next() {
+ Map.Entry<Column, Bytes> colEntry = colIter.next();
+ if (colEntry != null) {
+ txLog.filteredAdd(
+ LogEntry.newGet(rowEntry.getKey(), colEntry.getKey(), colEntry.getValue()),
+ filter);
+ }
+ return colEntry;
+ }
+ });
+ }
+ return rowEntry;
+ }
+ };
+ }
+ return rowIter;
+ }
+
+ @Override
+ public long getStartTimestamp() {
+ return txb.getStartTimestamp();
+ }
+
+ public TxLog getTxLog() {
+ return txLog;
+ }
+
+ /**
+ * Creates a RecordingTransactionBase by wrapping an existing TransactionBase
+ */
+ public static RecordingTransactionBase wrap(TransactionBase txb) {
+ return new RecordingTransactionBase(txb);
+ }
+
+ /**
+ * Creates a RecordingTransactionBase using the provided LogEntry filter function and existing
+ * TransactionBase
+ */
+ public static RecordingTransactionBase wrap(TransactionBase txb, Predicate<LogEntry> filter) {
+ return new RecordingTransactionBase(txb, filter);
+ }
+
+ @Override
+ public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+ Map<String, Map<Column, String>> rowColVal = txb.gets(rowColumns);
+ for (Map.Entry<String, Map<Column, String>> rowEntry : rowColVal.entrySet()) {
+ for (Map.Entry<Column, String> colEntry : rowEntry.getValue().entrySet()) {
+ txLog.filteredAdd(
+ LogEntry.newGet(rowEntry.getKey(), colEntry.getKey(), colEntry.getValue()), filter);
+ }
+ }
+ return rowColVal;
+ }
+
+ // TODO alot of these String methods may be more efficient if called the Byte version in this
+ // class... this would avoid conversion from Byte->String->Byte
+ @Override
+ public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) {
+ Map<String, Map<Column, String>> rowColVal = txb.gets(rows, columns);
+ for (Map.Entry<String, Map<Column, String>> rowEntry : rowColVal.entrySet()) {
+ for (Map.Entry<Column, String> colEntry : rowEntry.getValue().entrySet()) {
+ txLog.filteredAdd(
+ LogEntry.newGet(rowEntry.getKey(), colEntry.getKey(), colEntry.getValue()), filter);
+ }
+ }
+ return rowColVal;
+ }
+
+ @Override
+ public String gets(String row, Column col) {
+ String val = txb.gets(row, col);
+ if (val != null) {
+ txLog.filteredAdd(LogEntry.newGet(row, col, val), filter);
+ }
+ return val;
+ }
+
+ @Override
+ public Map<Column, String> gets(String row, Set<Column> columns) {
+ Map<Column, String> colVal = txb.gets(row, columns);
+ for (Map.Entry<Column, String> entry : colVal.entrySet()) {
+ txLog.filteredAdd(LogEntry.newGet(row, entry.getKey(), entry.getValue()), filter);
+ }
+ return colVal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/TxLog.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/TxLog.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/TxLog.java
new file mode 100644
index 0000000..caa3c4d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/TxLog.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.transaction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.RowColumn;
+
+/**
+ * Contains list of operations (GET, SET, DELETE) performed during a {@link RecordingTransaction}
+ */
+public class TxLog {
+
+ private List<LogEntry> logEntries = new ArrayList<>();
+
+ public TxLog() {}
+
+ /**
+ * Adds LogEntry to TxLog
+ */
+ public void add(LogEntry entry) {
+ logEntries.add(entry);
+ }
+
+ /**
+ * Adds LogEntry to TxLog if it passes filter
+ */
+ public void filteredAdd(LogEntry entry, Predicate<LogEntry> filter) {
+ if (filter.test(entry)) {
+ add(entry);
+ }
+ }
+
+ /**
+ * Returns all LogEntry in TxLog
+ */
+ public List<LogEntry> getLogEntries() {
+ return Collections.unmodifiableList(logEntries);
+ }
+
+ /**
+ * Returns true if TxLog is empty
+ */
+ public boolean isEmpty() {
+ return logEntries.isEmpty();
+ }
+
+ /**
+ * Returns a map of RowColumn changes given an operation
+ */
+ public Map<RowColumn, Bytes> getOperationMap(LogEntry.Operation op) {
+ Map<RowColumn, Bytes> opMap = new HashMap<>();
+ for (LogEntry entry : logEntries) {
+ if (entry.getOp().equals(op)) {
+ opMap.put(new RowColumn(entry.getRow(), entry.getColumn()), entry.getValue());
+ }
+ }
+ return opMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/Encoder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/Encoder.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/Encoder.java
new file mode 100644
index 0000000..f437b16
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/Encoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.data.Bytes;
+
+/**
+ * Transforms Java primitives to and from bytes using desired encoding
+ *
+ * @since 1.0.0
+ */
+public interface Encoder {
+
+ /**
+ * Encodes an integer to {@link Bytes}
+ */
+ Bytes encode(int i);
+
+ /**
+ * Encodes a long to {@link Bytes}
+ */
+ Bytes encode(long l);
+
+ /**
+ * Encodes a String to {@link Bytes}
+ */
+ Bytes encode(String s);
+
+ /**
+ * Encodes a float to {@link Bytes}
+ */
+ Bytes encode(float f);
+
+ /**
+ * Encodes a double to {@link Bytes}
+ */
+ Bytes encode(double d);
+
+ /**
+ * Encodes a boolean to {@link Bytes}
+ */
+ Bytes encode(boolean b);
+
+ /**
+ * Decodes an integer from {@link Bytes}
+ */
+ int decodeInteger(Bytes b);
+
+ /**
+ * Decodes a long from {@link Bytes}
+ */
+ long decodeLong(Bytes b);
+
+ /**
+ * Decodes a String from {@link Bytes}
+ */
+ String decodeString(Bytes b);
+
+ /**
+ * Decodes a float from {@link Bytes}
+ */
+ float decodeFloat(Bytes b);
+
+ /**
+ * Decodes a double from {@link Bytes}
+ */
+ double decodeDouble(Bytes b);
+
+ /**
+ * Decodes a boolean from {@link Bytes}
+ */
+ boolean decodeBoolean(Bytes b);
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/StringEncoder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/StringEncoder.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/StringEncoder.java
new file mode 100644
index 0000000..939f97a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/StringEncoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.data.Bytes;
+
+/**
+ * Transforms Java primitives to and from bytes using a String encoding
+ *
+ * @since 1.0.0
+ */
+public class StringEncoder implements Encoder {
+
+ @Override
+ public Bytes encode(int i) {
+ return encode(Integer.toString(i));
+ }
+
+ @Override
+ public Bytes encode(long l) {
+ return encode(Long.toString(l));
+ }
+
+ @Override
+ public Bytes encode(String s) {
+ return Bytes.of(s);
+ }
+
+ @Override
+ public Bytes encode(float f) {
+ return encode(Float.toString(f));
+ }
+
+ @Override
+ public Bytes encode(double d) {
+ return encode(Double.toString(d));
+ }
+
+ @Override
+ public Bytes encode(boolean b) {
+ return encode(Boolean.toString(b));
+ }
+
+ @Override
+ public int decodeInteger(Bytes b) {
+ return Integer.parseInt(decodeString(b));
+ }
+
+ @Override
+ public long decodeLong(Bytes b) {
+ return Long.parseLong(decodeString(b));
+ }
+
+ @Override
+ public String decodeString(Bytes b) {
+ return b.toString();
+ }
+
+ @Override
+ public float decodeFloat(Bytes b) {
+ return Float.parseFloat(decodeString(b));
+ }
+
+ @Override
+ public double decodeDouble(Bytes b) {
+ return Double.parseDouble(decodeString(b));
+ }
+
+ @Override
+ public boolean decodeBoolean(Bytes b) {
+ return Boolean.parseBoolean(decodeString(b));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypeLayer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypeLayer.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypeLayer.java
new file mode 100644
index 0000000..82639f4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypeLayer.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import java.nio.ByteBuffer;
+
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+/**
+ * A simple convenience layer for Fluo. This layer attempts to make the following common operations
+ * easier.
+ *
+ * <UL>
+ * <LI>Working with different types.
+ * <LI>Supplying default values
+ * <LI>Dealing with null return types.
+ * <LI>Working with row/column and column maps
+ * </UL>
+ *
+ * <p>
+ * This layer was intentionally loosely coupled with the basic API. This allows other convenience
+ * layers for Fluo to build directly on the basic API w/o having to consider the particulars of this
+ * layer. Also its expected that integration with other languages may only use the basic API.
+ * </p>
+ *
+ * <h3>Using</h3>
+ *
+ * <p>
+ * A TypeLayer is created with a certain encoder that is used for converting from bytes to
+ * primitives and visa versa. In order to ensure that all of your code uses the same encoder, its
+ * probably best to centralize the choice of an encoder within your project. There are many ways do
+ * to this, below is an example of one way to centralize and use.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * public class MyTypeLayer extends TypeLayer {
+ * public MyTypeLayer() {
+ * super(new MyEncoder());
+ * }
+ * }
+ *
+ * public class MyObserver extends TypedObserver {
+ * MyObserver(){
+ * super(new MyTypeLayer());
+ * }
+ *
+ * public abstract void process(TypedTransaction tx, Bytes row, Column col){
+ * //do something w/ typed transaction
+ * }
+ * }
+ *
+ * public class MyUtil {
+ * //A little util to print out some stuff
+ * public void printStuff(Snapshot snap, byte[] row){
+ * TypedSnapshot tsnap = new MytTypeLayer().wrap(snap);
+ *
+ * System.out.println(tsnap.get().row(row).fam("b90000").qual(137).toString("NP"));
+ * }
+ * }
+ * </code>
+ * </pre>
+ *
+ * <h3>Working with different types</h3>
+ *
+ * <p>
+ * The following example code shows using the basic fluo API with different types.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void process(Transaction tx, byte[] row, byte[] cf, int cq, long val){
+ * tx.set(Bytes.of(row), new Column(Bytes.of(cf), Bytes.of(Integer.toString(cq))),
+ * Bytes.of(Long.toString(val));
+ * }
+ * </code>
+ * </pre>
+ *
+ * <p>
+ * Alternatively, the same thing can be written using a {@link TypedTransactionBase} in the
+ * following way. Because row(), fam(), qual(), and set() each take many different types, this
+ * enables many different permutations that would not be achievable with overloading.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void process(TypedTransaction tx, byte[] r, byte[] cf, int cq, long v){
+ * tx.mutate().row(r).fam(cf).qual(cq).set(v);
+ * }
+ * </code>
+ * </pre>
+ *
+ * <h3>Default values</h3>
+ *
+ * <p>
+ * The following example code shows using the basic fluo API to read a value and default to zero if
+ * it does not exist.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void add(Transaction tx, byte[] row, Column col, long amount){
+ *
+ * long balance = 0;
+ * Bytes bval = tx.get(Bytes.of(row), col);
+ * if(bval != null)
+ * balance = Long.parseLong(bval.toString());
+ *
+ * balance += amount;
+ *
+ * tx.set(Bytes.of(row), col, Bytes.of(Long.toString(amount)));
+ *
+ * }
+ * </code>
+ * </pre>
+ *
+ * <p>
+ * Alternatively, the same thing can be written using a {@link TypedTransactionBase} in the
+ * following way. This code avoids the null check by supplying a default value of zero.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void add(TypedTransaction tx, byte[] r, Column c, long amount){
+ * long balance = tx.get().row(r).col(c).toLong(0);
+ * balance += amount;
+ * tx.mutate().row(r).col(c).set(balance);
+ * }
+ * </code>
+ * </pre>
+ *
+ * <p>
+ * For this particular case, shorter code can be written by using the increment method.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void add(TypedTransaction tx, byte[] r, Column c, long amount){
+ * tx.mutate().row(r).col(c).increment(amount);
+ * }
+ * </code>
+ * </pre>
+ *
+ * <h3>Null return types</h3>
+ *
+ * <p>
+ * When using the basic API, you must ensure the return type is not null before converting a string
+ * or long.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void process(Transaction tx, byte[] row, Column col, long amount) {
+ * Bytes val = tx.get(Bytes.of(row), col);
+ * if(val == null)
+ * return;
+ * long balance = Long.parseLong(val.toString());
+ * }
+ * </code>
+ * </pre>
+ *
+ * <p>
+ * With {@link TypedTransactionBase} if no default value is supplied, then the null is passed
+ * through.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void process(TypedTransaction tx, byte[] r, Column c, long amount){
+ * Long balance = tx.get().row(r).col(c).toLong();
+ * if(balance == null)
+ * return;
+ * }
+ * </code>
+ * </pre>
+ *
+ * <h3>Defaulted maps</h3>
+ *
+ * <p>
+ * The operations that return maps, return defaulted maps which make it easy to specify defaults and
+ * avoid null.
+ * </p>
+ *
+ * <pre>
+ * {@code
+ * // pretend this method has curly braces. javadoc has issues with less than.
+ *
+ * void process(TypedTransaction tx, byte[] r, Column c1, Column c2, Column c3, long amount)
+ *
+ * Map<Column, Value> columns = tx.get().row(r).columns(c1,c2,c3);
+ *
+ * // If c1 does not exist in map, a Value that wraps null will be returned.
+ * // When c1 does not exist val1 will be set to null and no NPE will be thrown.
+ * String val1 = columns.get(c1).toString();
+ *
+ * // If c2 does not exist in map, then val2 will be set to empty string.
+ * String val2 = columns.get(c2).toString("");
+ *
+ * // If c3 does not exist in map, then val9 will be set to 9.
+ * Long val3 = columns.get(c3).toLong(9);
+ * }
+ * </pre>
+ *
+ * <p>
+ * This also applies to getting sets of rows.
+ * </p>
+ *
+ * <pre>
+ * {@code
+ * // pretend this method has curly braces. javadoc has issues with less than.
+ *
+ * void process(TypedTransaction tx, List<String> rows, Column c1, Column c2, Column c3,
+ * long amount)
+ *
+ * Map<String,Map<Column,Value>> rowCols =
+ * tx.get().rowsString(rows).columns(c1,c2,c3).toStringMap();
+ *
+ * // this will set val1 to null if row does not exist in map and/or column does not
+ * // exist in child map
+ * String val1 = rowCols.get("row1").get(c1).toString();
+ * }
+ * </pre>
+ *
+ * @since 1.0.0
+ */
+public class TypeLayer {
+
+ private Encoder encoder;
+
+ static class Data {
+ Bytes row;
+ Bytes family;
+ Bytes qual;
+ Bytes vis;
+
+ Column getCol() {
+ if (qual == null) {
+ return new Column(family);
+ } else if (vis == null) {
+ return new Column(family, qual);
+ } else {
+ return new Column(family, qual, vis);
+ }
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public abstract class RowMethods<R> {
+
+ abstract R create(Data data);
+
+ public R row(String row) {
+ return row(encoder.encode(row));
+ }
+
+ public R row(int row) {
+ return row(encoder.encode(row));
+ }
+
+ public R row(long row) {
+ return row(encoder.encode(row));
+ }
+
+ public R row(byte[] row) {
+ return row(Bytes.of(row));
+ }
+
+ public R row(ByteBuffer row) {
+ return row(Bytes.of(row));
+ }
+
+ public R row(Bytes row) {
+ Data data = new Data();
+ data.row = row;
+ R result = create(data);
+ return result;
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public abstract class SimpleFamilyMethods<R1> {
+
+ Data data;
+
+ SimpleFamilyMethods(Data data) {
+ this.data = data;
+ }
+
+ abstract R1 create1(Data data);
+
+ public R1 fam(String family) {
+ return fam(encoder.encode(family));
+ }
+
+ public R1 fam(int family) {
+ return fam(encoder.encode(family));
+ }
+
+ public R1 fam(long family) {
+ return fam(encoder.encode(family));
+ }
+
+ public R1 fam(byte[] family) {
+ return fam(Bytes.of(family));
+ }
+
+ public R1 fam(ByteBuffer family) {
+ return fam(Bytes.of(family));
+ }
+
+ public R1 fam(Bytes family) {
+ data.family = family;
+ return create1(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public abstract class FamilyMethods<R1, R2> extends SimpleFamilyMethods<R1> {
+
+ FamilyMethods(Data data) {
+ super(data);
+ }
+
+ abstract R2 create2(Data data);
+
+ public R2 col(Column col) {
+ data.family = col.getFamily();
+ data.qual = col.getQualifier();
+ data.vis = col.getVisibility();
+ return create2(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public abstract class QualifierMethods<R> {
+
+ private Data data;
+
+ QualifierMethods(Data data) {
+ this.data = data;
+ }
+
+ abstract R create(Data data);
+
+ public R qual(String qualifier) {
+ return qual(encoder.encode(qualifier));
+ }
+
+ public R qual(int qualifier) {
+ return qual(encoder.encode(qualifier));
+ }
+
+ public R qual(long qualifier) {
+ return qual(encoder.encode(qualifier));
+ }
+
+ public R qual(byte[] qualifier) {
+ return qual(Bytes.of(qualifier));
+ }
+
+ public R qual(ByteBuffer qualifier) {
+ return qual(Bytes.of(qualifier));
+ }
+
+ public R qual(Bytes qualifier) {
+ data.qual = qualifier;
+ return create(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public static class VisibilityMethods {
+
+ private Data data;
+
+ VisibilityMethods(Data data) {
+ this.data = data;
+ }
+
+ public Column vis() {
+ return new Column(data.family, data.qual);
+ }
+
+ public Column vis(String cv) {
+ return vis(Bytes.of(cv));
+ }
+
+ public Column vis(Bytes cv) {
+ return new Column(data.family, data.qual, cv);
+ }
+
+ public Column vis(ByteBuffer cv) {
+ return vis(Bytes.of(cv));
+ }
+
+ public Column vis(byte[] cv) {
+ return vis(Bytes.of(cv));
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class CQB extends QualifierMethods<VisibilityMethods> {
+ CQB(Data data) {
+ super(data);
+ }
+
+ @Override
+ VisibilityMethods create(Data data) {
+ return new VisibilityMethods(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class CFB extends SimpleFamilyMethods<CQB> {
+ CFB() {
+ super(new Data());
+ }
+
+ @Override
+ CQB create1(Data data) {
+ return new CQB(data);
+ }
+ }
+
+ public TypeLayer(Encoder encoder) {
+ this.encoder = encoder;
+ }
+
+ /**
+ * Initiates the chain of calls needed to build a column.
+ *
+ * @return a column builder
+ */
+ public CFB bc() {
+ return new CFB();
+ }
+
+ public TypedSnapshot wrap(Snapshot snap) {
+ return new TypedSnapshot(snap, encoder, this);
+ }
+
+ public TypedTransactionBase wrap(TransactionBase tx) {
+ return new TypedTransactionBase(tx, encoder, this);
+ }
+
+ public TypedTransaction wrap(Transaction tx) {
+ return new TypedTransaction(tx, encoder, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedLoader.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedLoader.java
new file mode 100644
index 0000000..0e14062
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedLoader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.client.Loader;
+import org.apache.fluo.api.client.TransactionBase;
+
+/**
+ * A {@link Loader} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public abstract class TypedLoader implements Loader {
+
+ private final TypeLayer tl;
+
+ public TypedLoader() {
+ tl = new TypeLayer(new StringEncoder());
+ }
+
+ public TypedLoader(TypeLayer tl) {
+ this.tl = tl;
+ }
+
+ @Override
+ public void load(TransactionBase tx, Context context) throws Exception {
+ load(tl.wrap(tx), context);
+ }
+
+ public abstract void load(TypedTransactionBase tx, Context context) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedObserver.java
new file mode 100644
index 0000000..ca68285
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedObserver.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+
+/**
+ * An {@link AbstractObserver} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public abstract class TypedObserver extends AbstractObserver {
+
+ private final TypeLayer tl;
+
+ public TypedObserver() {
+ tl = new TypeLayer(new StringEncoder());
+ }
+
+ public TypedObserver(TypeLayer tl) {
+ this.tl = tl;
+ }
+
+ @Override
+ public void process(TransactionBase tx, Bytes row, Column col) {
+ process(tl.wrap(tx), row, col);
+ }
+
+ public abstract void process(TypedTransactionBase tx, Bytes row, Column col);
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshot.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshot.java
new file mode 100644
index 0000000..6c09764
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshot.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import org.apache.fluo.api.client.Snapshot;
+
+/**
+ * A {@link Snapshot} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedSnapshot extends TypedSnapshotBase implements Snapshot {
+
+ private final Snapshot closeSnapshot;
+
+ TypedSnapshot(Snapshot snapshot, Encoder encoder, TypeLayer tl) {
+ super(snapshot, encoder, tl);
+ closeSnapshot = snapshot;
+ }
+
+ @Override
+ public void close() {
+ closeSnapshot.close();
+ }
+}