You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2019/07/04 05:52:13 UTC

[arrow] 34/38: ARROW-5658: [JAVA] Sync schema for VectorSchemaRoot

This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 37f92001985fc455409b11b0d9d90f34437c2e5a
Author: liyafan82 <fa...@foxmail.com>
AuthorDate: Tue Jul 2 21:48:06 2019 -0700

    ARROW-5658: [JAVA] Sync schema for VectorSchemaRoot
    
    Resolve JIRA ARROW-5658.
    
    The fundamental problem is that, as data are inserted to the vector (e.g. ListVector), the schema of VectorShemaRoot can be different from the vector structure.
    
    In the server side, the deserialization is based on the schema, which is out of date, so it fails silently.
    
    In this PR, we fix the problem of obsolete schema and make the server print error info explicitly.
    
    Author: liyafan82 <fa...@foxmail.com>
    
    Closes #4689 from liyafan82/fly_5658 and squashes the following commits:
    
    e7d5865a5 <liyafan82>  Undo throwing exception
    3ea77bc76 <liyafan82>  Replace automatic updating schema with throwing an exception
    cb9da2032 <liyafan82> Merge branch 'master' into fly_5658
    47a776fbe <liyafan82>  Automatically update schema
    6d2763848 <liyafan82>  Resolve comments
    061e8bc2f <liyafan82>  Fix error log
    e8ea49f00 <liyafan82>  Sync schema for VectorSchemaRoot
---
 .../org/apache/arrow/flight/FlightService.java     |  1 +
 .../org/apache/arrow/vector/VectorSchemaRoot.java  | 22 ++++-
 .../apache/arrow/vector/TestVectorSchemaRoot.java  | 93 ++++++++++++++++++++++
 3 files changed, 115 insertions(+), 1 deletion(-)

diff --git a/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java b/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java
index ee45cef..e805917 100644
--- a/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java
+++ b/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java
@@ -191,6 +191,7 @@ class FlightService extends FlightServiceImplBase {
             StreamPipe.wrap(responseObserver, PutResult::toProtocol)).run();
         responseObserver.onCompleted();
       } catch (Exception ex) {
+        logger.error("Failed to process custom put.", ex);
         responseObserver.onError(ex);
         // The client may have terminated, so the exception here is effectively swallowed.
         // Log the error as well so -something- makes it to the developer.
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
index 373e03c..a3fab14 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
@@ -35,7 +35,7 @@ import org.apache.arrow.vector.types.pojo.Schema;
  */
 public class VectorSchemaRoot implements AutoCloseable {
 
-  private final Schema schema;
+  private Schema schema;
   private int rowCount;
   private final List<FieldVector> fieldVectors;
   private final Map<String, FieldVector> fieldVectorsMap = new HashMap<>();
@@ -206,4 +206,24 @@ public class VectorSchemaRoot implements AutoCloseable {
     }
     return sb.toString();
   }
+
+  /**
+   * Synchronizes the schema from the current vectors.
+   * In some cases, the schema and the actual vector structure may be different.
+   * This can be caused by a promoted writer (For details, please see
+   * {@link org.apache.arrow.vector.complex.impl.PromotableWriter}).
+   * For example, when writing different types of data to a {@link org.apache.arrow.vector.complex.ListVector}
+   * may lead to such a case.
+   * When this happens, this method should be called to bring the schema and vector structure in a synchronized state.
+   * @return true if the schema is updated, false otherwise.
+   */
+  public boolean syncSchema() {
+    List<Field> oldFields = this.schema.getFields();
+    List<Field> newFields = this.fieldVectors.stream().map(ValueVector::getField).collect(Collectors.toList());
+    if (!oldFields.equals(newFields)) {
+      this.schema = new Schema(newFields);
+      return true;
+    }
+    return false;
+  }
 }
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorSchemaRoot.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorSchemaRoot.java
index 480dcac..f9525f4 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorSchemaRoot.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorSchemaRoot.java
@@ -17,10 +17,24 @@
 
 package org.apache.arrow.vector;
 
+import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -75,4 +89,83 @@ public class TestVectorSchemaRoot {
     assertEquals(vec2.getValueCount(), count);
     assertEquals(vsr.getRowCount(), count);
   }
+
+  private VectorSchemaRoot createBatch() {
+    FieldType varCharType = new FieldType(true, new ArrowType.Utf8(), /*dictionary=*/null);
+    FieldType listType = new FieldType(true, new ArrowType.List(), /*dictionary=*/null);
+
+    // create the schema
+    List<Field> schemaFields  = new ArrayList<>();
+    Field childField = new Field("varCharCol", varCharType, null);
+    List<Field> childFields = new ArrayList<>();
+    childFields.add(childField);
+    schemaFields.add(new Field("listCol", listType, childFields));
+    Schema schema = new Schema(schemaFields);
+
+    VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, allocator);
+    // get and allocate the vector
+    ListVector vector = (ListVector) schemaRoot.getVector("listCol");
+    vector.allocateNew();
+
+    // write data to the vector
+    UnionListWriter writer = vector.getWriter();
+
+    writer.setPosition(0);
+
+    // write data vector(0)
+    writer.startList();
+
+    // write data vector(0)(0)
+    writer.list().startList();
+
+    // According to the schema above, the list element should have varchar type.
+    // When we write a big int, the original writer cannot handle this, so the writer will
+    // be promoted, and the vector structure will be different from the schema.
+    writer.list().bigInt().writeBigInt(0);
+    writer.list().bigInt().writeBigInt(1);
+    writer.list().endList();
+
+    // write data vector(0)(1)
+    writer.list().startList();
+    writer.list().float8().writeFloat8(3.0D);
+    writer.list().float8().writeFloat8(7.0D);
+    writer.list().endList();
+
+    // finish data vector(0)
+    writer.endList();
+
+    writer.setPosition(1);
+
+    // write data vector(1)
+    writer.startList();
+
+    // write data vector(1)(0)
+    writer.list().startList();
+    writer.list().integer().writeInt(3);
+    writer.list().integer().writeInt(2);
+    writer.list().endList();
+
+    // finish data vector(1)
+    writer.endList();
+
+    vector.setValueCount(2);
+
+    return schemaRoot;
+  }
+
+  @Test
+  public void testSchemaSync() {
+    //create vector schema root
+    try (VectorSchemaRoot schemaRoot = createBatch()) {
+      Schema newSchema = new Schema(
+              schemaRoot.getFieldVectors().stream().map(vec -> vec.getField()).collect(Collectors.toList()));
+
+      assertNotEquals(newSchema, schemaRoot.getSchema());
+      assertTrue(schemaRoot.syncSchema());
+      assertEquals(newSchema, schemaRoot.getSchema());
+
+      // no schema update this time.
+      assertFalse(schemaRoot.syncSchema());
+    }
+  }
 }