You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/09/02 02:57:57 UTC

incubator-fluo git commit: Fixes #758

Repository: incubator-fluo
Updated Branches:
  refs/heads/master 468d1cfc4 -> 48395d1a3


Fixes #758

* Modifed get(Collection<RowColumn> rowColumns) return type.

Closes #760


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/48395d1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/48395d1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/48395d1a

Branch: refs/heads/master
Commit: 48395d1a31ba2740397fee3c99e80e75e95fe662
Parents: 468d1cf
Author: Christopher McTague <cj...@vwc.edu>
Authored: Thu Aug 11 13:35:18 2016 -0400
Committer: Keith Turner <ke...@deenlo.com>
Committed: Thu Sep 1 22:48:27 2016 -0400

----------------------------------------------------------------------
 .../org/apache/fluo/api/client/SnapshotBase.java     |  8 ++++----
 .../org/apache/fluo/core/impl/TransactionImpl.java   | 13 ++++++++-----
 .../java/org/apache/fluo/core/impl/TxStringUtil.java |  6 +++---
 .../org/apache/fluo/core/log/TracingTransaction.java | 14 ++++++++++----
 .../org/apache/fluo/integration/TestTransaction.java |  4 ++--
 .../fluo/integration/impl/ParallelScannerIT.java     | 15 ++++++++-------
 .../java/org/apache/fluo/integration/log/LogIT.java  |  2 +-
 7 files changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/48395d1a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
index b7942d4..4dcc84d 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
@@ -51,10 +51,10 @@ public interface SnapshotBase {
   Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns);
 
   /**
-   * Given a collection of {@link RowColumn}s, retrieves a map contains the values at those rows and
-   * {@link Column}s. Only rows and columns that exists will be returned in map.
+   * Given a collection of {@link RowColumn}s, retrieves a map contains the values at
+   * {@link RowColumn}. Only rows and columns that exists will be returned in map.
    */
-  Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns);
+  Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns);
 
   /**
    * This method is the starting point for constructing a scanner. Scanners can be constructed over
@@ -99,7 +99,7 @@ public interface SnapshotBase {
    * Wrapper for {@link #get(Collection)} that uses Strings. All strings are encoded and decoded
    * using UTF-8.
    */
-  Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns);
+  Map<RowColumn, String> gets(Collection<RowColumn> rowColumns);
 
   /**
    * Wrapper for {@link #get(Collection, Set)} that uses Strings. All strings are encoded and

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/48395d1a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 803e672..bbda32a 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -191,7 +191,7 @@ public class TransactionImpl implements AsyncTransaction, Snapshot {
   }
 
   @Override
-  public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
+  public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
     checkIfOpen();
 
     if (rowColumns.size() == 0) {
@@ -200,10 +200,14 @@ public class TransactionImpl implements AsyncTransaction, Snapshot {
 
     ParallelSnapshotScanner pss = new ParallelSnapshotScanner(rowColumns, env, startTs, stats);
 
-    Map<Bytes, Map<Column, Bytes>> ret = pss.scan();
+    Map<Bytes, Map<Column, Bytes>> scan = pss.scan();
+    Map<RowColumn, Bytes> ret = new HashMap<>();
 
-    for (Entry<Bytes, Map<Column, Bytes>> entry : ret.entrySet()) {
+    for (Entry<Bytes, Map<Column, Bytes>> entry : scan.entrySet()) {
       updateColumnsRead(entry.getKey(), entry.getValue().keySet());
+      for (Entry<Column, Bytes> colVal : entry.getValue().entrySet()) {
+        ret.put(new RowColumn(entry.getKey(), colVal.getKey()), colVal.getValue());
+      }
     }
 
     return ret;
@@ -670,7 +674,7 @@ public class TransactionImpl implements AsyncTransaction, Snapshot {
   }
 
   @Override
-  public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+  public Map<RowColumn, String> gets(Collection<RowColumn> rowColumns) {
     return TxStringUtil.gets(this, rowColumns);
   }
 
@@ -1064,7 +1068,6 @@ public class TransactionImpl implements AsyncTransaction, Snapshot {
         commmitPrimary(cd, commitTs);
       }
     }, env.getSharedResources().getAsyncCommitExecutor());
-
   }
 
   private void commmitPrimary(CommitData cd, final long commitTs) {

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/48395d1a/modules/core/src/main/java/org/apache/fluo/core/impl/TxStringUtil.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TxStringUtil.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TxStringUtil.java
index a6111cf..ec1759a 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TxStringUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TxStringUtil.java
@@ -56,8 +56,8 @@ public class TxStringUtil {
     return transform(snapshot.get(Collections2.transform(rows, s -> Bytes.of(s)), columns));
   }
 
-  public static Map<String, Map<Column, String>> gets(SnapshotBase snapshot,
-      Collection<RowColumn> rowColumns) {
-    return transform(snapshot.get(rowColumns));
+  public static Map<RowColumn, String> gets(SnapshotBase snapshot, Collection<RowColumn> rowColumns) {
+    Map<RowColumn, Bytes> bytesMap = snapshot.get(rowColumns);
+    return Maps.transformValues(bytesMap, b -> b.toString());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/48395d1a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
index a1020b9..ab2e429 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
@@ -80,6 +80,12 @@ public class TracingTransaction implements AsyncTransaction, Snapshot {
         + "=" + encC(e.getValue())));
   }
 
+  private String encRCM(Map<RowColumn, Bytes> ret) {
+    return Iterators.toString(Iterators.transform(ret.entrySet().iterator(),
+        e -> Hex.encNonAscii(e.getKey()) + "=" + enc(e.getValue())));
+  }
+
+
   private String encC(Collection<Column> columns) {
     return Iterators.toString(Iterators.transform(columns.iterator(), Hex::encNonAscii));
   }
@@ -139,10 +145,10 @@ public class TracingTransaction implements AsyncTransaction, Snapshot {
   }
 
   @Override
-  public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
-    Map<Bytes, Map<Column, Bytes>> ret = tx.get(rowColumns);
+  public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
+    Map<RowColumn, Bytes> ret = tx.get(rowColumns);
     if (log.isTraceEnabled()) {
-      log.trace("txid: {} get({}) -> {}", txid, encRC(rowColumns), encRC(ret));
+      log.trace("txid: {} get({}) -> {}", txid, encRC(rowColumns), encRCM(ret));
     }
     return ret;
   }
@@ -264,7 +270,7 @@ public class TracingTransaction implements AsyncTransaction, Snapshot {
   }
 
   @Override
-  public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+  public Map<RowColumn, String> gets(Collection<RowColumn> rowColumns) {
     return TxStringUtil.gets(this, rowColumns);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/48395d1a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
index 62b3ebe..8661f45 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
@@ -200,7 +200,7 @@ public class TestTransaction implements TransactionBase {
   }
 
   @Override
-  public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
+  public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
     return tx.get(rowColumns);
   }
 
@@ -210,7 +210,7 @@ public class TestTransaction implements TransactionBase {
   }
 
   @Override
-  public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+  public Map<RowColumn, String> gets(Collection<RowColumn> rowColumns) {
     return tx.gets(rowColumns);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/48395d1a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java
index 554a762..63e0807 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java
@@ -17,9 +17,9 @@ package org.apache.fluo.integration.impl;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Map;
 
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
@@ -57,15 +57,16 @@ public class ParallelScannerIT extends ITBaseImpl {
     newEdges.add(new RowColumn("node8", new Column("edge", "node3")));
     newEdges.add(new RowColumn("node5", new Column("edge", "node7")));
 
-    Map<String, Map<Column, String>> existing = tx2.gets(newEdges);
+    Map<RowColumn, String> existing = tx2.gets(newEdges);
 
     tx2.done();
 
-    Assert.assertEquals(ImmutableSet.of("node1", "node5"), existing.keySet());
-    Assert.assertEquals(ImmutableSet.of(new Column("edge", "node3")), existing.get("node1")
-        .keySet());
-    Assert.assertEquals(ImmutableSet.of(new Column("edge", "node2"), new Column("edge", "node7")),
-        existing.get("node5").keySet());
+    HashSet<RowColumn> expected = new HashSet<>();
+    expected.add(new RowColumn("node1", new Column("edge", "node3")));
+    expected.add(new RowColumn("node5", new Column("edge", "node2")));
+    expected.add(new RowColumn("node5", new Column("edge", "node7")));
+
+    Assert.assertEquals(expected, existing.keySet());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/48395d1a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
index 7f76d0a..bb8ed69 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
@@ -342,7 +342,7 @@ public class LogIT extends ITBaseMini {
     String logMsgs = origLogMsgs.replace('\n', ' ');
 
     String pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
-    pattern += ".*txid: \\1 \\Qget([r1 f1 q1 , r2 f1 q2 ]) -> [r1=[f1 q1 =v1], r2=[f1 q2 =v4]]\\E";
+    pattern += ".*txid: \\1 \\Qget([r1 f1 q1 , r2 f1 q2 ]) -> [r2 f1 q2 =v4, r1 f1 q1 =v1]\\E";
     pattern += ".*txid: \\1 \\Qget([r1, r2], [f1 q1 ]) -> [r1=[f1 q1 =v1], r2=[f1 q1 =v3]]\\E";
     pattern += ".*txid: \\1 \\Qget(r1, [f1 q1 , f1 q2 ]) -> [f1 q1 =v1, f1 q2 =v2]\\E";
     pattern += ".*txid: \\1 \\Qget(r1, f1 q1 ) -> v1\\E";