You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ro...@apache.org on 2018/11/03 21:28:49 UTC

[arrow] branch master updated: ARROW-3648: [Plasma][Java] Add API to get metadata and data at the same time

This is an automated email from the ASF dual-hosted git repository.

robertnishihara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 756f645  ARROW-3648: [Plasma][Java] Add API to get metadata and data at the same time
756f645 is described below

commit 756f64536c7b43d42ea940b34c9551a9ddac9ccc
Author: Yuhong Guo <yu...@antfin.com>
AuthorDate: Sat Nov 3 14:28:37 2018 -0700

    ARROW-3648: [Plasma][Java] Add API to get metadata and data at the same time
    
    Current Arrow Java Plasma client has no API to get the metadata and data together in one API call. If we split this process into two API calls, the object status could be different. Current observation shows that the first call could be empty(object not stored yet) while the second call will success but the metadata and data does not match.
    
    Author: Yuhong Guo <yu...@antfin.com>
    
    Closes #2862 from guoyuhong/javaClientMetaTest and squashes the following commits:
    
    bd10550f <Yuhong Guo> Use self-defined data struct
    58f80c80 <Yuhong Guo> Remove Pair.of
    b5fba3b6 <Yuhong Guo> Fix POM
    9ecf2d6c <Yuhong Guo> Fix
    2f597090 <Yuhong Guo> Add dependency to pom
    f987cbce <Yuhong Guo> Lint
    8b1519b8 <Yuhong Guo> Add Support for reading once for data and meta for Plasma java client
    f7462109 <Yuhong Guo> Lint
    97ff76ec <Yuhong Guo> Add test case to cover Java Plasma Client Test
---
 java/plasma/pom.xml                                | 22 +++++++-------
 .../org/apache/arrow/plasma/ObjectStoreLink.java   | 25 ++++++++++++++--
 .../java/org/apache/arrow/plasma/PlasmaClient.java | 28 ++++++++++++++++-
 .../org/apache/arrow/plasma/PlasmaClientTest.java  | 35 ++++++++++++++++++++++
 4 files changed, 96 insertions(+), 14 deletions(-)

diff --git a/java/plasma/pom.xml b/java/plasma/pom.xml
index be0d700..d50171a 100644
--- a/java/plasma/pom.xml
+++ b/java/plasma/pom.xml
@@ -18,16 +18,16 @@
     </parent>
     <artifactId>arrow-plasma</artifactId>
     <name>Arrow Plasma Client</name>
-    <build>  
-        <plugins>  
-            <plugin>  
-                <groupId>org.apache.maven.plugins</groupId>  
-                <artifactId>maven-compiler-plugin</artifactId>  
-                <configuration>  
-                    <source>1.8</source>  
-                    <target>1.8</target>  
-                </configuration>  
-            </plugin>  
-        </plugins>  
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+        </plugins>
     </build>
 </project>
diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java b/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java
index a328eec..d4371d2 100644
--- a/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java
+++ b/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java
@@ -24,6 +24,17 @@ import java.util.List;
  */
 public interface ObjectStoreLink {
 
+  class ObjectStoreData {
+
+    ObjectStoreData(byte[] metadata, byte[] data) {
+      this.data = data;
+      this.metadata = metadata;
+    }
+
+    public final byte[] metadata;
+    public final byte[] data;
+  }
+
   /**
    * Put value in the local plasma store with object ID <tt>objectId</tt>.
    *
@@ -34,7 +45,7 @@ public interface ObjectStoreLink {
   void put(byte[] objectId, byte[] value, byte[] metadata);
 
   /**
-   * Create a buffer from the PlasmaStore based on the <tt>objectId</tt>.
+   * Get a buffer from the PlasmaStore based on the <tt>objectId</tt>.
    *
    * @param objectId The object ID used to identify the object.
    * @param timeoutMs The number of milliseconds that the get call should block before timing out
@@ -48,7 +59,7 @@ public interface ObjectStoreLink {
   }
 
   /**
-   * Create buffers from the PlasmaStore based on <tt>objectIds</tt>.
+   * Get buffers from the PlasmaStore based on <tt>objectIds</tt>.
    *
    * @param objectIds List of object IDs used to identify some objects.
    * @param timeoutMs The number of milliseconds that the get call should block before timing out
@@ -59,6 +70,16 @@ public interface ObjectStoreLink {
   List<byte[]> get(byte[][] objectIds, int timeoutMs, boolean isMetadata);
 
   /**
+   * Get buffer pairs (data & metadata) from the PlasmaStore based on <tt>objectIds</tt>.
+   *
+   * @param objectIds List of object IDs used to identify some objects.
+   * @param timeoutMs The number of milliseconds that the get call should block before timing out
+   * and returning. Pass -1 if the call should block and 0 if the call should return immediately.
+   * @return List of Pairs of PlasmaBuffer wrapping objects and its metadata.
+   */
+  List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs);
+
+  /**
    * Wait until <tt>numReturns</tt> objects in <tt>objectIds</tt> are ready.
    *
    * @param objectIds List of object IDs to wait for.
diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java
index 3388ded..8257c9a 100644
--- a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java
+++ b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.List;
 
 
-
 /**
  * The PlasmaClient is used to interface with a plasma store and manager.
  *
@@ -111,6 +110,33 @@ public class PlasmaClient implements ObjectStoreLink {
   }
 
   @Override
+  public List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs) {
+    ByteBuffer[][] bufs = PlasmaClientJNI.get(conn, objectIds, timeoutMs);
+    assert bufs.length == objectIds.length;
+
+    List<ObjectStoreData> ret = new ArrayList<>();
+    for (int i = 0; i < bufs.length; i++) {
+      ByteBuffer databuf = bufs[i][0];
+      ByteBuffer metabuf = bufs[i][1];
+      if (databuf == null) {
+        ret.add(new ObjectStoreData(null, null));
+      } else {
+        byte[] data = new byte[databuf.remaining()];
+        databuf.get(data);
+        byte[] meta;
+        if (metabuf != null) {
+          meta = new byte[metabuf.remaining()];
+          metabuf.get(meta);
+        } else {
+          meta = null;
+        }
+        ret.add(new ObjectStoreData(data, meta));
+      }
+    }
+    return ret;
+  }
+
+  @Override
   public long evict(long numBytes) {
     return PlasmaClientJNI.evict(conn, numBytes);
   }
diff --git a/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java b/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java
index a6e4982..f36468b 100644
--- a/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java
+++ b/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java
@@ -154,6 +154,41 @@ public class PlasmaClientTest {
     boolean notExsit = pLink.contains(id3);
     assert !notExsit;
     System.out.println("Plasma java client contains test success.");
+
+    byte[] id4 =  new byte[20];
+    Arrays.fill(id4, (byte)4);
+    byte[] value4 =  new byte[20];
+    byte[] meta4 = "META4".getBytes();
+    Arrays.fill(value4, (byte)14);
+    pLink.put(id4, value4, meta4);
+
+    byte[] id5 =  new byte[20];
+    Arrays.fill(id5, (byte)5);
+    byte[] value5 =  new byte[20];
+    byte[] meta5 = "META5".getBytes();
+    Arrays.fill(value5, (byte)15);
+    pLink.put(id5, value5, meta5);
+
+    byte[] getMeta4 = pLink.get(id4, timeoutMs, true);
+    assert Arrays.equals(meta4, getMeta4);
+    byte[] getValue4 = pLink.get(id4, timeoutMs, false);
+    assert Arrays.equals(value4, getValue4);
+    byte[][] ids4 = new byte[1][];
+    ids4[0] = id4;
+    ObjectStoreLink.ObjectStoreData fullData4 = pLink.get(ids4, timeoutMs).get(0);
+    assert Arrays.equals(meta4, fullData4.metadata);
+    assert Arrays.equals(value4, fullData4.data);
+
+    byte[] getMeta5 = pLink.get(id5, timeoutMs, true);
+    assert Arrays.equals(meta5, getMeta5);
+    byte[] getValue5 = pLink.get(id5, timeoutMs, false);
+    assert Arrays.equals(value5, getValue5);
+    byte[][] ids5 = new byte[1][];
+    ids5[0] = id5;
+    ObjectStoreLink.ObjectStoreData fullData5 = pLink.get(ids5, timeoutMs).get(0);
+    assert Arrays.equals(meta5, fullData5.metadata);
+    assert Arrays.equals(value5, fullData5.data);
+    System.out.println("Plasma java client metadata get test success.");
     cleanup();
     System.out.println("All test success.");