You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2017/04/28 21:52:45 UTC

incubator-fluo-recipes git commit: Reduced creation of intermediate String and byte arrays

Repository: incubator-fluo-recipes
Updated Branches:
  refs/heads/master 4f11ec4d9 -> 0cda5ae9c


Reduced creation of intermediate String and byte arrays


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/0cda5ae9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/0cda5ae9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/0cda5ae9

Branch: refs/heads/master
Commit: 0cda5ae9ccb43497e43ef48d8651dfd969e5d085
Parents: 4f11ec4
Author: Keith Turner <kt...@apache.org>
Authored: Wed Apr 26 18:21:27 2017 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Wed Apr 26 19:26:56 2017 -0400

----------------------------------------------------------------------
 .../fluo/recipes/core/export/ExportBucket.java  | 41 +++++++++---------
 .../fluo/recipes/core/map/CollisionFreeMap.java | 44 ++++++++++----------
 pom.xml                                         |  2 +-
 3 files changed, 44 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/0cda5ae9/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
index 8100b37..4882ee1 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java
@@ -22,6 +22,7 @@ import com.google.common.base.Strings;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.RowColumnValue;
@@ -90,17 +91,15 @@ class ExportBucket {
     this.qid = bucketRow.subSequence(0, colonLoc).toString();
   }
 
-  private static byte[] encSeq(long l) {
-    byte[] ret = new byte[8];
-    ret[0] = (byte) (l >>> 56);
-    ret[1] = (byte) (l >>> 48);
-    ret[2] = (byte) (l >>> 40);
-    ret[3] = (byte) (l >>> 32);
-    ret[4] = (byte) (l >>> 24);
-    ret[5] = (byte) (l >>> 16);
-    ret[6] = (byte) (l >>> 8);
-    ret[7] = (byte) (l >>> 0);
-    return ret;
+  private static void encSeq(BytesBuilder bb, long l) {
+    bb.append((byte) (l >>> 56));
+    bb.append((byte) (l >>> 48));
+    bb.append((byte) (l >>> 40));
+    bb.append((byte) (l >>> 32));
+    bb.append((byte) (l >>> 24));
+    bb.append((byte) (l >>> 16));
+    bb.append((byte) (l >>> 8));
+    bb.append((byte) (l >>> 0));
   }
 
   private static long decodeSeq(Bytes seq) {
@@ -111,17 +110,18 @@ class ExportBucket {
   }
 
   public void add(long seq, byte[] key, byte[] value) {
-    Bytes row =
-        Bytes.builder(bucketRow.length() + 1 + key.length + 8).append(bucketRow).append(":")
-            .append(key).append(encSeq(seq)).toBytes();
-    ttx.set(row, EXPORT_COL, Bytes.of(value));
+    BytesBuilder builder =
+        Bytes.builder(bucketRow.length() + 1 + key.length + 8).append(bucketRow).append(':')
+            .append(key);
+    encSeq(builder, seq);
+    ttx.set(builder.toBytes(), EXPORT_COL, Bytes.of(value));
   }
 
   /**
    * Computes the minimial row for a bucket
    */
   private Bytes getMinimalRow() {
-    return Bytes.builder(bucketRow.length() + 1).append(bucketRow).append(":").toBytes();
+    return Bytes.builder(bucketRow.length() + 1).append(bucketRow).append(':').toBytes();
   }
 
   public void notifyExportObserver() {
@@ -190,10 +190,11 @@ class ExportBucket {
   }
 
   public void setContinueRow(ExportEntry ee) {
-    Bytes nextRow =
-        Bytes.builder(bucketRow.length() + 1 + ee.key.length + 8).append(bucketRow).append(":")
-            .append(ee.key).append(encSeq(ee.seq)).toBytes();
-
+    BytesBuilder builder =
+        Bytes.builder(bucketRow.length() + 1 + ee.key.length + 8).append(bucketRow).append(':')
+            .append(ee.key);
+    encSeq(builder, ee.seq);
+    Bytes nextRow = builder.toBytes();
     ttx.set(getMinimalRow(), NEXT_COL, nextRow);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/0cda5ae9/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
index 939e9ed..0a05919 100644
--- a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
@@ -62,7 +62,9 @@ public class CollisionFreeMap<K, V> {
 
   private static final String DATA_RANGE_END = ":d:~";
 
-  private String mapId;
+  private Bytes updatePrefix;
+  private Bytes dataPrefix;
+  private Column notifyColumn;
 
   private Class<K> keyType;
   private Class<V> valType;
@@ -78,8 +80,9 @@ public class CollisionFreeMap<K, V> {
 
   @SuppressWarnings("unchecked")
   CollisionFreeMap(Options opts, SimpleSerializer serializer) throws Exception {
-
-    this.mapId = opts.mapId;
+    this.updatePrefix = Bytes.of(opts.mapId + ":u:");
+    this.dataPrefix = Bytes.of(opts.mapId + ":d:");
+    this.notifyColumn = new Column("fluoRecipes", "cfm:" + opts.mapId);
     // TODO defer loading classes
     // TODO centralize class loading
     // TODO try to check type params
@@ -109,6 +112,8 @@ public class CollisionFreeMap<K, V> {
 
   void process(TransactionBase tx, Bytes ntfyRow, Column col) throws Exception {
 
+    Preconditions.checkState(ntfyRow.startsWith(updatePrefix));
+
     Bytes nextKey = tx.get(ntfyRow, NEXT_COL);
 
     Span span;
@@ -173,7 +178,7 @@ public class CollisionFreeMap<K, V> {
         } else {
           // start next time at the next possible key
           Bytes nextPossible =
-              Bytes.builder(lastKey.length() + 1).append(lastKey).append(new byte[] {0}).toBytes();
+              Bytes.builder(lastKey.length() + 1).append(lastKey).append(0).toBytes();
           tx.set(ntfyRow, NEXT_COL, nextPossible);
         }
 
@@ -194,12 +199,11 @@ public class CollisionFreeMap<K, V> {
       tx.setWeakNotification(ntfyRow, col);
     }
 
-    byte[] dataPrefix = ntfyRow.toArray();
-    // TODO this is awful... no sanity check... hard to read
-    dataPrefix[Bytes.of(mapId).length() + 1] = 'd';
+
 
     BytesBuilder rowBuilder = Bytes.builder();
     rowBuilder.append(dataPrefix);
+    rowBuilder.append(ntfyRow.subSequence(updatePrefix.length(), ntfyRow.length()));
     int rowPrefixLen = rowBuilder.getLength();
 
     Set<Bytes> keysToFetch = updates.keySet();
@@ -294,7 +298,7 @@ public class CollisionFreeMap<K, V> {
 
 
     BytesBuilder rowBuilder = Bytes.builder();
-    rowBuilder.append(mapId).append(":u:").append(bucketId).append(":").append(k);
+    rowBuilder.append(updatePrefix).append(bucketId).append(':').append(k);
 
     Iterator<RowColumnValue> iter =
         tx.scanner().over(Span.prefix(rowBuilder.toBytes())).build().iterator();
@@ -307,8 +311,8 @@ public class CollisionFreeMap<K, V> {
       ui = Collections.<V>emptyList().iterator();
     }
 
-    rowBuilder.setLength(mapId.length());
-    rowBuilder.append(":d:").append(bucketId).append(":").append(k);
+    rowBuilder.setLength(0);
+    rowBuilder.append(dataPrefix).append(bucketId).append(':').append(k);
 
     Bytes dataRow = rowBuilder.toBytes();
 
@@ -325,10 +329,6 @@ public class CollisionFreeMap<K, V> {
     return combiner.combine(key, concat(ui, cv)).orElse(null);
   }
 
-  String getId() {
-    return mapId;
-  }
-
   /**
    * Queues updates for a collision free map. These updates will be made by an Observer executing
    * another transaction. This method will not collide with other transaction queuing updates for
@@ -344,7 +344,7 @@ public class CollisionFreeMap<K, V> {
     Set<String> buckets = new HashSet<>();
 
     BytesBuilder rowBuilder = Bytes.builder();
-    rowBuilder.append(mapId).append(":u:");
+    rowBuilder.append(updatePrefix);
     int prefixLength = rowBuilder.getLength();
 
     byte[] startTs = encSeq(tx.getStartTimestamp());
@@ -357,7 +357,7 @@ public class CollisionFreeMap<K, V> {
       // reset to the common row prefix
       rowBuilder.setLength(prefixLength);
 
-      Bytes row = rowBuilder.append(bucketId).append(":").append(k).append(startTs).toBytes();
+      Bytes row = rowBuilder.append(bucketId).append(':').append(k).append(startTs).toBytes();
       Bytes val = Bytes.of(serializer.serialize(entry.getValue()));
 
       // TODO set if not exists would be comforting here.... but
@@ -369,11 +369,11 @@ public class CollisionFreeMap<K, V> {
 
     for (String bucketId : buckets) {
       rowBuilder.setLength(prefixLength);
-      rowBuilder.append(bucketId).append(":");
+      rowBuilder.append(bucketId).append(':');
 
       Bytes row = rowBuilder.toBytes();
 
-      tx.setWeakNotification(row, new Column("fluoRecipes", "cfm:" + mapId));
+      tx.setWeakNotification(row, notifyColumn);
     }
   }
 
@@ -418,14 +418,14 @@ public class CollisionFreeMap<K, V> {
 
     private static final long serialVersionUID = 1L;
 
-    private String mapId;
+    private Bytes dataPrefix;
 
     private SimpleSerializer serializer;
 
     private int numBuckets = -1;
 
     private Initializer(String mapId, int numBuckets, SimpleSerializer serializer) {
-      this.mapId = mapId;
+      this.dataPrefix = Bytes.of(mapId + ":d:");
       this.numBuckets = numBuckets;
       this.serializer = serializer;
     }
@@ -435,8 +435,8 @@ public class CollisionFreeMap<K, V> {
       int hash = Hashing.murmur3_32().hashBytes(k).asInt();
       String bucketId = genBucketId(Math.abs(hash % numBuckets), numBuckets);
 
-      BytesBuilder bb = Bytes.builder();
-      Bytes row = bb.append(mapId).append(":d:").append(bucketId).append(":").append(k).toBytes();
+      BytesBuilder bb = Bytes.builder(dataPrefix.length() + bucketId.length() + 1 + k.length);
+      Bytes row = bb.append(dataPrefix).append(bucketId).append(':').append(k).toBytes();
       byte[] v = serializer.serialize(val);
 
       return new RowColumnValue(row, DATA_COLUMN, Bytes.of(v));

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/0cda5ae9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3332077..05a2480 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
     <accumulo.version>1.6.6</accumulo.version>
     <curator.version>2.7.1</curator.version>
     <findbugs.maxRank>13</findbugs.maxRank>
-    <fluo.version>1.0.0-incubating</fluo.version>
+    <fluo.version>1.1.0-incubating-SNAPSHOT</fluo.version>
     <hadoop.version>2.6.3</hadoop.version>
     <maven.compiler.source>1.8</maven.compiler.source>
     <maven.compiler.target>1.8</maven.compiler.target>