You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/08/24 11:44:51 UTC

[arrow] branch master updated: ARROW-17431: [Java] MapBinder to bind Arrow Map type to DB column (#13941)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 712e06bc47 ARROW-17431: [Java] MapBinder to bind Arrow Map type to DB column (#13941)
712e06bc47 is described below

commit 712e06bc4755e27633b8a991d7cc7f5956cfc6c7
Author: Igor Suhorukov <ig...@gmail.com>
AuthorDate: Wed Aug 24 14:44:38 2022 +0300

    ARROW-17431: [Java] MapBinder to bind Arrow Map type to DB column (#13941)
    
    Typical real life Arrow datasets contain map of primitive type. This PR introduce MapBinder mapping of primitive types map entries
    
    Authored-by: igor.suhorukov <ig...@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 .../jdbc/binder/ColumnBinderArrowTypeVisitor.java  |   3 +-
 .../arrow/adapter/jdbc/binder/MapBinder.java       |  90 ++++++++++
 .../adapter/jdbc/JdbcParameterBinderTest.java      | 184 +++++++++++++++++++++
 3 files changed, 276 insertions(+), 1 deletion(-)

diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java
index f790b6a541..dc70872404 100644
--- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java
@@ -45,6 +45,7 @@ import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 
 /**
@@ -99,7 +100,7 @@ public class ColumnBinderArrowTypeVisitor implements ArrowType.ArrowTypeVisitor<
 
   @Override
   public ColumnBinder visit(ArrowType.Map type) {
-    throw new UnsupportedOperationException("No column binder implemented for type " + type);
+    return new MapBinder((MapVector) vector);
   }
 
   @Override
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/MapBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/MapBinder.java
new file mode 100644
index 0000000000..07391eb7cb
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/MapBinder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.impl.UnionMapReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.util.JsonStringHashMap;
+
+/**
+ * A column binder for map of primitive values.
+ */
+public class MapBinder extends BaseColumnBinder<MapVector> {
+
+  private UnionMapReader reader;
+  private final boolean isTextKey;
+  private final boolean isTextValue;
+
+  public MapBinder(MapVector vector) {
+    this(vector, Types.VARCHAR);
+  }
+
+  /**
+   * Init MapBinder and determine type of data vector.
+   *
+   * @param vector corresponding data vector from arrow buffer for binding
+   * @param jdbcType parameter jdbc type
+   */
+  public MapBinder(MapVector vector, int jdbcType) {
+    super(vector, jdbcType);
+    reader = vector.getReader();
+    List<Field> structField = Objects.requireNonNull(vector.getField()).getChildren();
+    if (structField.size() != 1) {
+      throw new IllegalArgumentException("Expected Struct field metadata inside Map field");
+    }
+    List<Field> keyValueFields = Objects.requireNonNull(structField.get(0)).getChildren();
+    if (keyValueFields.size() != 2) {
+      throw new IllegalArgumentException("Expected two children fields " +
+                                         "inside nested Struct field in Map");
+    }
+    ArrowType keyType = Objects.requireNonNull(keyValueFields.get(0)).getType();
+    ArrowType valueType = Objects.requireNonNull(keyValueFields.get(1)).getType();
+    isTextKey = ArrowType.Utf8.INSTANCE.equals(keyType);
+    isTextValue = ArrowType.Utf8.INSTANCE.equals(valueType);
+  }
+
+  @Override
+  public void bind(PreparedStatement statement,
+                   int parameterIndex, int rowIndex) throws SQLException {
+    reader.setPosition(rowIndex);
+    LinkedHashMap<Object, Object> tags = new JsonStringHashMap<>();
+    while (reader.next()) {
+      Object key = reader.key().readObject();
+      Object value = reader.value().readObject();
+      tags.put(isTextKey && key != null ? key.toString() : key,
+               isTextValue && value != null ? value.toString() : value);
+    }
+    switch (jdbcType) {
+      case Types.VARCHAR:
+        statement.setString(parameterIndex, tags.toString());
+        break;
+      case Types.OTHER:
+      default:
+        statement.setObject(parameterIndex, tags);
+    }
+  }
+}
diff --git a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java
index fb4e6e5eb8..15b9ab0386 100644
--- a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java
+++ b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java
@@ -30,6 +30,7 @@ import java.sql.Types;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.function.BiConsumer;
 
 import org.apache.arrow.adapter.jdbc.binder.ColumnBinder;
@@ -69,6 +70,7 @@ import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.types.DateUnit;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
 import org.apache.arrow.vector.types.TimeUnit;
@@ -76,6 +78,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.JsonStringHashMap;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -473,6 +476,81 @@ public class JdbcParameterBinderTest {
     testListType((ArrowType) new ArrowType.Utf8(), setValue, ListVector::setNull, values);
   }
 
+  @Test
+  void mapOfString() throws SQLException {
+    TriConsumer<MapVector, Integer, Map<String, String>> setValue = (mapVector, index, values) -> {
+      org.apache.arrow.vector.complex.impl.UnionMapWriter mapWriter = mapVector.getWriter();
+      mapWriter.setPosition(index);
+      mapWriter.startMap();
+      values.entrySet().forEach(mapValue -> {
+        if (mapValue != null) {
+          byte[] keyBytes = mapValue.getKey().getBytes(StandardCharsets.UTF_8);
+          byte[] valueBytes = mapValue.getValue().getBytes(StandardCharsets.UTF_8);
+          try (
+              ArrowBuf keyBuf = allocator.buffer(keyBytes.length);
+              ArrowBuf valueBuf = allocator.buffer(valueBytes.length);
+          ) {
+            mapWriter.startEntry();
+            keyBuf.writeBytes(keyBytes);
+            valueBuf.writeBytes(valueBytes);
+            mapWriter.key().varChar().writeVarChar(0, keyBytes.length, keyBuf);
+            mapWriter.value().varChar().writeVarChar(0, valueBytes.length, valueBuf);
+            mapWriter.endEntry();
+          }
+        } else {
+          mapWriter.writeNull();
+        }
+      });
+      mapWriter.endMap();
+    };
+
+    JsonStringHashMap<String, String> value1 = new JsonStringHashMap<String, String>();
+    value1.put("a", "b");
+    value1.put("c", "d");
+    JsonStringHashMap<String, String> value2 = new JsonStringHashMap<String, String>();
+    value2.put("d", "e");
+    value2.put("f", "g");
+    value2.put("k", "l");
+    JsonStringHashMap<String, String> value3 = new JsonStringHashMap<String, String>();
+    value3.put("y", "z");
+    value3.put("arrow", "cool");
+    List<Map<String, String>> values = Arrays.asList(value1, value2, value3, Collections.emptyMap());
+    testMapType(new ArrowType.Map(true), setValue, MapVector::setNull, values, new ArrowType.Utf8());
+  }
+
+  @Test
+  void mapOfInteger() throws SQLException {
+    TriConsumer<MapVector, Integer, Map<Integer, Integer>> setValue = (mapVector, index, values) -> {
+      org.apache.arrow.vector.complex.impl.UnionMapWriter mapWriter = mapVector.getWriter();
+      mapWriter.setPosition(index);
+      mapWriter.startMap();
+      values.entrySet().forEach(mapValue -> {
+        if (mapValue != null) {
+          mapWriter.startEntry();
+          mapWriter.key().integer().writeInt(mapValue.getKey());
+          mapWriter.value().integer().writeInt(mapValue.getValue());
+          mapWriter.endEntry();
+        } else {
+          mapWriter.writeNull();
+        }
+      });
+      mapWriter.endMap();
+    };
+
+    JsonStringHashMap<Integer, Integer> value1 = new JsonStringHashMap<Integer, Integer>();
+    value1.put(1, 2);
+    value1.put(3, 4);
+    JsonStringHashMap<Integer, Integer> value2 = new JsonStringHashMap<Integer, Integer>();
+    value2.put(5, 6);
+    value2.put(7, 8);
+    value2.put(9, 1024);
+    JsonStringHashMap<Integer, Integer> value3 = new JsonStringHashMap<Integer, Integer>();
+    value3.put(Integer.MIN_VALUE, Integer.MAX_VALUE);
+    value3.put(0, 4096);
+    List<Map<Integer, Integer>> values = Arrays.asList(value1, value2, value3, Collections.emptyMap());
+    testMapType(new ArrowType.Map(true), setValue, MapVector::setNull, values, new ArrowType.Int(32, true));
+  }
+
   @FunctionalInterface
   interface TriConsumer<T, U, V> {
     void accept(T value1, U value2, V value3);
@@ -672,4 +750,110 @@ public class JdbcParameterBinderTest {
       assertThat(binder.next()).isFalse();
     }
   }
+
+  <T, V extends FieldVector> void testMapType(ArrowType arrowType, TriConsumer<V, Integer, T> setValue,
+                                              BiConsumer<V, Integer> setNull, List<T> values,
+                                              ArrowType elementType) throws SQLException {
+    int jdbcType = Types.VARCHAR;
+    FieldType keyType = new FieldType(false, elementType, null, null);
+    FieldType mapType = new FieldType(false, ArrowType.Struct.INSTANCE, null, null);
+    Schema schema = new Schema(Collections.singletonList(new Field("field", FieldType.nullable(arrowType),
+            Collections.singletonList(new Field(MapVector.KEY_NAME, mapType,
+                    Arrays.asList(new Field(MapVector.KEY_NAME, keyType, null),
+                            new Field(MapVector.VALUE_NAME, keyType, null)))))));
+    try (final MockPreparedStatement statement = new MockPreparedStatement();
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      final JdbcParameterBinder binder =
+          JdbcParameterBinder.builder(statement, root).bindAll().build();
+      assertThat(binder.next()).isFalse();
+
+      @SuppressWarnings("unchecked")
+      final V vector = (V) root.getVector(0);
+      final ColumnBinder columnBinder = ColumnBinder.forVector(vector);
+      assertThat(columnBinder.getJdbcType()).isEqualTo(jdbcType);
+
+      setValue.accept(vector, 0, values.get(0));
+      setValue.accept(vector, 1, values.get(1));
+      setNull.accept(vector, 2);
+      root.setRowCount(3);
+
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(0).toString());
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(1).toString());
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isNull();
+      assertThat(statement.getParamType(1)).isEqualTo(jdbcType);
+      assertThat(binder.next()).isFalse();
+
+      binder.reset();
+
+      setNull.accept(vector, 0);
+      setValue.accept(vector, 1, values.get(3));
+      setValue.accept(vector, 2, values.get(0));
+      setValue.accept(vector, 3, values.get(2));
+      setValue.accept(vector, 4, values.get(1));
+      root.setRowCount(5);
+
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isNull();
+      assertThat(statement.getParamType(1)).isEqualTo(jdbcType);
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(3).toString());
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(0).toString());
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(2).toString());
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(1).toString());
+      assertThat(binder.next()).isFalse();
+    }
+
+    // Non-nullable (since some types have a specialized binder)
+    schema = new Schema(Collections.singletonList(new Field("field", FieldType.notNullable(arrowType),
+            Collections.singletonList(new Field(MapVector.KEY_NAME, mapType,
+                    Arrays.asList(new Field(MapVector.KEY_NAME, keyType, null),
+                            new Field(MapVector.VALUE_NAME, keyType, null)))))));
+    try (final MockPreparedStatement statement = new MockPreparedStatement();
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      @SuppressWarnings("unchecked")
+      final V vector = (V) root.getVector(0);
+
+      final JdbcParameterBinder binder =
+          JdbcParameterBinder.builder(statement, root).bind(1,
+                  new org.apache.arrow.adapter.jdbc.binder.MapBinder((MapVector) vector, Types.OTHER)).build();
+      assertThat(binder.next()).isFalse();
+
+      setValue.accept(vector, 0, values.get(0));
+      setValue.accept(vector, 1, values.get(1));
+      root.setRowCount(2);
+
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(0));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(1));
+      assertThat(binder.next()).isFalse();
+
+      binder.reset();
+
+      setValue.accept(vector, 0, values.get(0));
+      setValue.accept(vector, 1, values.get(2));
+      setValue.accept(vector, 2, values.get(0));
+      setValue.accept(vector, 3, values.get(2));
+      setValue.accept(vector, 4, values.get(1));
+      root.setRowCount(5);
+
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(0));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(2));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(0));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(2));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(1));
+      assertThat(binder.next()).isFalse();
+    }
+  }
 }