You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/17 14:37:45 UTC

[GitHub] [flink] dianfu opened a new pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

dianfu opened a new pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112
 
 
   
   ## What is the purpose of the change
   
   *This PR is to introduce classes such as ArrowReader which is used to read the execution results of vectorized Python UDF and ArrowWriter which is used to convert Flink rows to Arrow format before sending them to the Python worker for vectorized Python UDF execution.*
   
   ## Brief change log
   
     - *Introduce ArrowReader which is used to read the execution results of vectorized Python UDF.*
     - *Introduce ArrowWriter which is used to convert Flink rows to Arrow format before sending them to the Python worker for vectorized Python UDF execution.*
     - *Introduce ArrowUtils which is used to create ArrowReader and ArrowWriter.*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added test ArrowUtilsTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#discussion_r382389771
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/BaseRowArrowReader.java
 ##########
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow;
 
 Review comment:
   Put this class into readers module? Same for RowArrowReader.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#discussion_r382384546
 
 

 ##########
 File path: flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java
 ##########
 @@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow;
+
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.vector.ColumnVector;
+import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.IntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowSmallIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowTinyIntColumnVector;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowBigIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowSmallIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowTinyIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BigIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.IntWriter;
+import org.apache.flink.table.runtime.arrow.writers.SmallIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.TinyIntWriter;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.types.Row;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ArrowUtils}.
+ */
+public class ArrowUtilsTest {
+
+	private static List<Tuple7<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>, Class<?>>> testFields;
+	private static RowType rowType;
+	private static BufferAllocator allocator;
+
+	@BeforeClass
+	public static void init() {
+		testFields = new ArrayList<>();
+		testFields.add(Tuple7.of(
+			"f1", new TinyIntType(), new ArrowType.Int(8, true), TinyIntWriter.class,
+			BaseRowTinyIntWriter.class, TinyIntFieldReader.class, ArrowTinyIntColumnVector.class));
+		testFields.add(Tuple7.of("f2", new SmallIntType(), new ArrowType.Int(8 * 2, true),
+			SmallIntWriter.class, BaseRowSmallIntWriter.class, SmallIntFieldReader.class, ArrowSmallIntColumnVector.class));
+		testFields.add(Tuple7.of("f3", new IntType(), new ArrowType.Int(8 * 4, true),
+			IntWriter.class, BaseRowIntWriter.class, IntFieldReader.class, ArrowIntColumnVector.class));
+		testFields.add(Tuple7.of("f4", new BigIntType(), new ArrowType.Int(8 * 8, true),
+			BigIntWriter.class, BaseRowBigIntWriter.class, BigIntFieldReader.class, ArrowBigIntColumnVector.class));
+
+		List<RowType.RowField> rowFields = new ArrayList<>();
+		for (Tuple7<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>, Class<?>> field : testFields) {
+			rowFields.add(new RowType.RowField(field.f0, field.f1));
+		}
+		rowType = new RowType(rowFields);
+
+		allocator = ArrowUtils.ROOT_ALLOCATOR.newChildAllocator(
+			"stdout", 0, Long.MAX_VALUE);
+	}
+
+	@Test
+	public void testConvertBetweenLogicalTypeAndArrowType() {
+		Schema schema = ArrowUtils.toArrowSchema(rowType);
+
+		assertEquals(testFields.size(), schema.getFields().size());
+		List<Field> fields = schema.getFields();
+		for (int i = 0; i < schema.getFields().size(); i++) {
+			// verify convert from RowType to ArrowType
+			assertEquals(testFields.get(i).f0, fields.get(i).getName());
+			assertEquals(testFields.get(i).f2, fields.get(i).getType());
+			// verify convert from ArrowType to LogicalType
+			assertEquals(testFields.get(i).f1, ArrowUtils.fromArrowField(fields.get(i)));
+		}
+	}
+
+	@Test
+	public void testCreateArrowReader() {
+		VectorSchemaRoot root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
+		RowArrowReader reader = ArrowUtils.createArrowReader(root);
+		ArrowFieldReader[] fieldReaders = reader.getFieldReaders();
+		for (int i = 0; i < fieldReaders.length; i++) {
+			assertEquals(testFields.get(i).f5, fieldReaders[i].getClass());
+		}
+	}
+
+	@Test
+	public void testCreateBlinkArrowReader() {
+		VectorSchemaRoot root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
+		BaseRowArrowReader reader = ArrowUtils.createBlinkArrowReader(root);
+		ColumnVector[] columnVectors = reader.getColumnVectors();
+		for (int i = 0; i < columnVectors.length; i++) {
+			assertEquals(testFields.get(i).f6, columnVectors[i].getClass());
+		}
+	}
+
+	@Test
+	public void testCreateArrowWriter() {
+		VectorSchemaRoot root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
+		ArrowWriter<Row> writer = ArrowUtils.createArrowWriter(root);
+		ArrowFieldWriter<Row>[] fieldWriters = writer.getFieldWriters();
+		for (int i = 0; i < fieldWriters.length; i++) {
+			assertEquals(testFields.get(i).f3, fieldWriters[i].getClass());
 
 Review comment:
   Can we also test the functionalities for both writer and reader? For example, read after write then check the results. We only check the class name here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#discussion_r382412135
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowReader.java
 ##########
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow;
+
+/**
+ * Reader which deserialize the Arrow format data to the Flink rows.
+ *
+ * @param <OUT> Type of the deserialized row.
+ */
+public interface ArrowReader<OUT> {
 
 Review comment:
   @Internal

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#discussion_r382381840
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
 ##########
 @@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.vector.ColumnVector;
+import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.IntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowSmallIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowTinyIntColumnVector;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowBigIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowSmallIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowTinyIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BigIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.IntWriter;
+import org.apache.flink.table.runtime.arrow.writers.SmallIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.TinyIntWriter;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.types.Row;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for Arrow.
+ */
+@Internal
+public final class ArrowUtils {
+
+	public static final RootAllocator ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE);
+
+	/**
+	 * Returns the Arrow schema of the specified type.
+	 */
+	public static Schema toArrowSchema(RowType rowType) {
+		Collection<Field> fields = rowType.getFields().stream()
+			.map(ArrowUtils::toArrowField)
+			.collect(Collectors.toCollection(ArrayList::new));
+		return new Schema(fields);
+	}
+
+	private static Field toArrowField(RowType.RowField rowField) {
+		FieldType fieldType = new FieldType(
+			rowField.getType().isNullable(),
+			rowField.getType().accept(LogicalTypeToArrowTypeConverter.INSTANCE),
+			null);
+		return new Field(rowField.getName(), fieldType, null);
+	}
+
+	/**
+	 * Creates an {@link ArrowWriter} for the specified {@link VectorSchemaRoot}.
+	 */
+	public static ArrowWriter<Row> createArrowWriter(VectorSchemaRoot root) {
+		ArrowFieldWriter<Row>[] fieldWriters = new ArrowFieldWriter[root.getFieldVectors().size()];
+		List<FieldVector> vectors = root.getFieldVectors();
+		for (int i = 0; i < vectors.size(); i++) {
+			FieldVector vector = vectors.get(i);
+			vector.allocateNew();
+			fieldWriters[i] = createArrowFieldWriter(vector);
+		}
+
+		return new ArrowWriter<>(root, fieldWriters);
+	}
+
+	private static ArrowFieldWriter<Row> createArrowFieldWriter(FieldVector vector) {
+		if (vector instanceof TinyIntVector) {
+			return new TinyIntWriter((TinyIntVector) vector);
+		} else if (vector instanceof SmallIntVector) {
+			return new SmallIntWriter((SmallIntVector) vector);
+		} else if (vector instanceof IntVector) {
+			return new IntWriter((IntVector) vector);
+		} else if (vector instanceof BigIntVector) {
+			return new BigIntWriter((BigIntVector) vector);
+		} else {
+			throw new UnsupportedOperationException(String.format(
+				"Unsupported type %s.", fromArrowField(vector.getField())));
+		}
+	}
+
+	/**
+	 * Creates an {@link ArrowWriter} for blink planner for the specified {@link VectorSchemaRoot}.
+	 */
+	public static ArrowWriter<BaseRow> createBlinkArrowWriter(VectorSchemaRoot root) {
+		ArrowFieldWriter<BaseRow>[] fieldWriters = new ArrowFieldWriter[root.getFieldVectors().size()];
+		List<FieldVector> vectors = root.getFieldVectors();
+		for (int i = 0; i < vectors.size(); i++) {
+			FieldVector vector = vectors.get(i);
+			vector.allocateNew();
+			fieldWriters[i] = createBlinkArrowFieldWriter(vector);
+		}
+
+		return new ArrowWriter<>(root, fieldWriters);
+	}
+
+	private static ArrowFieldWriter<BaseRow> createBlinkArrowFieldWriter(FieldVector vector) {
+		if (vector instanceof TinyIntVector) {
+			return new BaseRowTinyIntWriter((TinyIntVector) vector);
+		} else if (vector instanceof SmallIntVector) {
+			return new BaseRowSmallIntWriter((SmallIntVector) vector);
+		} else if (vector instanceof IntVector) {
+			return new BaseRowIntWriter((IntVector) vector);
+		} else if (vector instanceof BigIntVector) {
+			return new BaseRowBigIntWriter((BigIntVector) vector);
+		} else {
+			throw new UnsupportedOperationException(String.format(
+				"Unsupported type %s.", fromArrowField(vector.getField())));
+		}
+	}
+
+	/**
+	 * Creates an {@link ArrowReader} for the specified {@link VectorSchemaRoot}.
+	 */
+	public static RowArrowReader createArrowReader(VectorSchemaRoot root) {
+		List<ArrowFieldReader> fieldReaders = new ArrayList<>();
+		for (FieldVector vector : root.getFieldVectors()) {
+			fieldReaders.add(createFieldReader(vector));
+		}
+
+		return new RowArrowReader(fieldReaders.toArray(new ArrowFieldReader[0]));
+	}
+
+	private static ArrowFieldReader createFieldReader(FieldVector vector) {
 
 Review comment:
   Rename to `createArrowFieldReader`, making it consistent with `createArrowFieldWriter`. How about rearrange the method name in this class as follows:
   ```
   createArrowWriter -> createRowArrowWriter (corresponding with BaseRow)
   createArrowFieldWriter-> createRowArrowFieldWriter
   createBlinkArrowWriter -> createBaseRowArrowWriter (consistent with BaseRowXXXOperator and BaseRowXXXFunction)
   createBlinkArrowFieldWriter -> createBaseRowArrowFieldWriter
   createArrowReader -> createRowArrowReader
   createFieldReader -> createArrowFieldReader
   createBlinkArrowReader -> createBaseRowArrowReader
   fromArrowField -> fromArrowFieldToLogicalType  (more explicit)
   fromArrowType -> fromArrowTypeToLogicalType
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#issuecomment-587035354
 
 
   <!--
   Meta data
   Hash:dba261b36b56aedec766097042dbe40858e5fc8c Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/149292329 TriggerType:PUSH TriggerID:dba261b36b56aedec766097042dbe40858e5fc8c
   -->
   ## CI report:
   
   * dba261b36b56aedec766097042dbe40858e5fc8c Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149292329) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
dianfu commented on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#issuecomment-590184765
 
 
   @hequn8128 Thanks a lot for your review. Have updated the PR accordingly. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#issuecomment-587035354
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149292329",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150247518",
       "triggerID" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5495",
       "triggerID" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c973405c3d613b89b42fa27430d1eb53f8acf30",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150257048",
       "triggerID" : "5c973405c3d613b89b42fa27430d1eb53f8acf30",
       "triggerType" : "PUSH"
     }, {
       "hash" : "059a6c132696e9f7dec67133ecbbf5e281c0debf",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150266372",
       "triggerID" : "059a6c132696e9f7dec67133ecbbf5e281c0debf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "059a6c132696e9f7dec67133ecbbf5e281c0debf",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5505",
       "triggerID" : "059a6c132696e9f7dec67133ecbbf5e281c0debf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 059a6c132696e9f7dec67133ecbbf5e281c0debf Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150266372) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5505) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 closed pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
hequn8128 closed pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#issuecomment-587035354
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149292329",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150247518",
       "triggerID" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5495",
       "triggerID" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c973405c3d613b89b42fa27430d1eb53f8acf30",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150257048",
       "triggerID" : "5c973405c3d613b89b42fa27430d1eb53f8acf30",
       "triggerType" : "PUSH"
     }, {
       "hash" : "059a6c132696e9f7dec67133ecbbf5e281c0debf",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "059a6c132696e9f7dec67133ecbbf5e281c0debf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fc594cbe4670795421281fd5d19cd5b3d8109a9e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150247518) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5495) 
   * 5c973405c3d613b89b42fa27430d1eb53f8acf30 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150257048) 
   * 059a6c132696e9f7dec67133ecbbf5e281c0debf UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#discussion_r382381926
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowFieldWriter.java
 ##########
 @@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow;
 
 Review comment:
   Move this class into arrow.writer? Same for the corresponding reader.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#discussion_r382375658
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
 ##########
 @@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.vector.ColumnVector;
+import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.IntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowSmallIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowTinyIntColumnVector;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowBigIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowSmallIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowTinyIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BigIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.IntWriter;
+import org.apache.flink.table.runtime.arrow.writers.SmallIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.TinyIntWriter;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.types.Row;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for Arrow.
+ */
+@Internal
+public final class ArrowUtils {
+
+	public static final RootAllocator ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE);
+
+	/**
+	 * Returns the Arrow schema of the specified type.
+	 */
+	public static Schema toArrowSchema(RowType rowType) {
+		Collection<Field> fields = rowType.getFields().stream()
+			.map(ArrowUtils::toArrowField)
+			.collect(Collectors.toCollection(ArrayList::new));
+		return new Schema(fields);
+	}
+
+	private static Field toArrowField(RowType.RowField rowField) {
+		FieldType fieldType = new FieldType(
+			rowField.getType().isNullable(),
+			rowField.getType().accept(LogicalTypeToArrowTypeConverter.INSTANCE),
+			null);
+		return new Field(rowField.getName(), fieldType, null);
+	}
+
+	/**
+	 * Creates an {@link ArrowWriter} for the specified {@link VectorSchemaRoot}.
+	 */
+	public static ArrowWriter<Row> createArrowWriter(VectorSchemaRoot root) {
+		ArrowFieldWriter<Row>[] fieldWriters = new ArrowFieldWriter[root.getFieldVectors().size()];
+		List<FieldVector> vectors = root.getFieldVectors();
+		for (int i = 0; i < vectors.size(); i++) {
+			FieldVector vector = vectors.get(i);
+			vector.allocateNew();
+			fieldWriters[i] = createArrowFieldWriter(vector);
+		}
+
+		return new ArrowWriter<>(root, fieldWriters);
+	}
+
+	private static ArrowFieldWriter<Row> createArrowFieldWriter(FieldVector vector) {
+		if (vector instanceof TinyIntVector) {
+			return new TinyIntWriter((TinyIntVector) vector);
+		} else if (vector instanceof SmallIntVector) {
+			return new SmallIntWriter((SmallIntVector) vector);
+		} else if (vector instanceof IntVector) {
+			return new IntWriter((IntVector) vector);
+		} else if (vector instanceof BigIntVector) {
+			return new BigIntWriter((BigIntVector) vector);
+		} else {
+			throw new UnsupportedOperationException(String.format(
+				"Unsupported type %s.", fromArrowField(vector.getField())));
+		}
+	}
+
+	/**
+	 * Creates an {@link ArrowWriter} for blink planner for the specified {@link VectorSchemaRoot}.
+	 */
+	public static ArrowWriter<BaseRow> createBlinkArrowWriter(VectorSchemaRoot root) {
+		ArrowFieldWriter<BaseRow>[] fieldWriters = new ArrowFieldWriter[root.getFieldVectors().size()];
+		List<FieldVector> vectors = root.getFieldVectors();
+		for (int i = 0; i < vectors.size(); i++) {
+			FieldVector vector = vectors.get(i);
+			vector.allocateNew();
+			fieldWriters[i] = createBlinkArrowFieldWriter(vector);
+		}
+
+		return new ArrowWriter<>(root, fieldWriters);
+	}
+
+	private static ArrowFieldWriter<BaseRow> createBlinkArrowFieldWriter(FieldVector vector) {
+		if (vector instanceof TinyIntVector) {
+			return new BaseRowTinyIntWriter((TinyIntVector) vector);
+		} else if (vector instanceof SmallIntVector) {
+			return new BaseRowSmallIntWriter((SmallIntVector) vector);
+		} else if (vector instanceof IntVector) {
+			return new BaseRowIntWriter((IntVector) vector);
+		} else if (vector instanceof BigIntVector) {
+			return new BaseRowBigIntWriter((BigIntVector) vector);
+		} else {
+			throw new UnsupportedOperationException(String.format(
+				"Unsupported type %s.", fromArrowField(vector.getField())));
+		}
+	}
+
+	/**
+	 * Creates an {@link ArrowReader} for the specified {@link VectorSchemaRoot}.
+	 */
+	public static RowArrowReader createArrowReader(VectorSchemaRoot root) {
+		List<ArrowFieldReader> fieldReaders = new ArrayList<>();
+		for (FieldVector vector : root.getFieldVectors()) {
+			fieldReaders.add(createFieldReader(vector));
+		}
+
+		return new RowArrowReader(fieldReaders.toArray(new ArrowFieldReader[0]));
+	}
+
+	private static ArrowFieldReader createFieldReader(FieldVector vector) {
+		if (vector instanceof TinyIntVector) {
+			return new TinyIntFieldReader((TinyIntVector) vector);
+		} else if (vector instanceof SmallIntVector) {
+			return new SmallIntFieldReader((SmallIntVector) vector);
+		} else if (vector instanceof IntVector) {
+			return new IntFieldReader((IntVector) vector);
+		} else if (vector instanceof BigIntVector) {
+			return new BigIntFieldReader((BigIntVector) vector);
+		} else {
+			throw new UnsupportedOperationException(String.format(
+				"Unsupported type %s.", fromArrowField(vector.getField())));
+		}
+	}
+
+	/**
+	 * Creates an {@link ArrowReader} for blink planner for the specified {@link VectorSchemaRoot}.
+	 */
+	public static BaseRowArrowReader createBlinkArrowReader(VectorSchemaRoot root) {
+		List<ColumnVector> columnVectors = new ArrayList<>();
+		for (FieldVector vector : root.getFieldVectors()) {
+			columnVectors.add(createColumnVector(vector));
+		}
+
+		return new BaseRowArrowReader(columnVectors.toArray(new ColumnVector[0]));
+	}
+
+	private static ColumnVector createColumnVector(FieldVector vector) {
+		if (vector instanceof TinyIntVector) {
+			return new ArrowTinyIntColumnVector((TinyIntVector) vector);
+		} else if (vector instanceof SmallIntVector) {
+			return new ArrowSmallIntColumnVector((SmallIntVector) vector);
+		} else if (vector instanceof IntVector) {
+			return new ArrowIntColumnVector((IntVector) vector);
+		} else if (vector instanceof BigIntVector) {
+			return new ArrowBigIntColumnVector((BigIntVector) vector);
+		} else {
+			throw new UnsupportedOperationException(String.format(
+				"Unsupported type %s.", fromArrowField(vector.getField())));
+		}
+	}
+
+	public static LogicalType fromArrowField(Field field) {
+		if (field.getType() == ArrowType.List.INSTANCE) {
+			LogicalType elementType = fromArrowField(field.getChildren().get(0));
+			return new ArrayType(field.isNullable(), elementType);
+		} else if (field.getType() == ArrowType.Struct.INSTANCE) {
+			List<RowType.RowField> fields = field.getChildren().stream().map(child -> {
+				LogicalType type = fromArrowField(child);
+				return new RowType.RowField(child.getName(), type, null);
+			}).collect(Collectors.toList());
+			return new RowType(field.isNullable(), fields);
+		} else if (field.getType() instanceof ArrowType.Map) {
+			Field elementField = field.getChildren().get(0);
+			LogicalType keyType = fromArrowField(elementField.getChildren().get(0));
+			LogicalType valueType = fromArrowField(elementField.getChildren().get(1));
+			return new MapType(field.isNullable(), keyType, valueType);
+		} else {
+			return fromArrowType(field.isNullable(), field.getType());
+		}
+	}
+
+	private static LogicalType fromArrowType(boolean isNullable, ArrowType arrowType) {
+		if (arrowType instanceof ArrowType.Int && ((ArrowType.Int) arrowType).getIsSigned()) {
+			ArrowType.Int intType = (ArrowType.Int) arrowType;
+			if (intType.getBitWidth() == 8) {
+				return new TinyIntType(isNullable);
+			} else if (intType.getBitWidth() == 8 * 2) {
+				return new SmallIntType(isNullable);
+			} else if (intType.getBitWidth() == 8 * 4) {
+				return new IntType(isNullable);
+			} else if (intType.getBitWidth() == 8 * 8) {
+				return new BigIntType(isNullable);
+			}
+		}
+		throw new UnsupportedOperationException(
+			String.format("Unexpected arrow type: %s.", arrowType.toString()));
+	}
+
+	private static class LogicalTypeToArrowTypeConverter implements LogicalTypeVisitor<ArrowType> {
 
 Review comment:
   How about extends `LogicalTypeDefaultVisitor` and throw a readable exception for the unsupported types in `defaultMethod`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#issuecomment-587035354
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149292329",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150247518",
       "triggerID" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5495",
       "triggerID" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c973405c3d613b89b42fa27430d1eb53f8acf30",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5c973405c3d613b89b42fa27430d1eb53f8acf30",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fc594cbe4670795421281fd5d19cd5b3d8109a9e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150247518) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5495) 
   * 5c973405c3d613b89b42fa27430d1eb53f8acf30 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#issuecomment-587035354
 
 
   <!--
   Meta data
   Hash:dba261b36b56aedec766097042dbe40858e5fc8c Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/149292329 TriggerType:PUSH TriggerID:dba261b36b56aedec766097042dbe40858e5fc8c
   Hash:dba261b36b56aedec766097042dbe40858e5fc8c Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252 TriggerType:PUSH TriggerID:dba261b36b56aedec766097042dbe40858e5fc8c
   -->
   ## CI report:
   
   * dba261b36b56aedec766097042dbe40858e5fc8c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149292329) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#issuecomment-587035354
 
 
   <!--
   Meta data
   Hash:dba261b36b56aedec766097042dbe40858e5fc8c Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:dba261b36b56aedec766097042dbe40858e5fc8c
   -->
   ## CI report:
   
   * dba261b36b56aedec766097042dbe40858e5fc8c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#discussion_r382413045
 
 

 ##########
 File path: flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java
 ##########
 @@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow;
+
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.vector.ColumnVector;
+import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.IntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowSmallIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowTinyIntColumnVector;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowBigIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowSmallIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowTinyIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BigIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.IntWriter;
+import org.apache.flink.table.runtime.arrow.writers.SmallIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.TinyIntWriter;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.types.Row;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ArrowUtils}.
+ */
+public class ArrowUtilsTest {
+
+	private static List<Tuple7<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>, Class<?>>> testFields;
+	private static RowType rowType;
+	private static BufferAllocator allocator;
+
+	@BeforeClass
+	public static void init() {
+		testFields = new ArrayList<>();
+		testFields.add(Tuple7.of(
+			"f1", new TinyIntType(), new ArrowType.Int(8, true), TinyIntWriter.class,
+			BaseRowTinyIntWriter.class, TinyIntFieldReader.class, ArrowTinyIntColumnVector.class));
+		testFields.add(Tuple7.of("f2", new SmallIntType(), new ArrowType.Int(8 * 2, true),
+			SmallIntWriter.class, BaseRowSmallIntWriter.class, SmallIntFieldReader.class, ArrowSmallIntColumnVector.class));
+		testFields.add(Tuple7.of("f3", new IntType(), new ArrowType.Int(8 * 4, true),
+			IntWriter.class, BaseRowIntWriter.class, IntFieldReader.class, ArrowIntColumnVector.class));
+		testFields.add(Tuple7.of("f4", new BigIntType(), new ArrowType.Int(8 * 8, true),
+			BigIntWriter.class, BaseRowBigIntWriter.class, BigIntFieldReader.class, ArrowBigIntColumnVector.class));
+
+		List<RowType.RowField> rowFields = new ArrayList<>();
+		for (Tuple7<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>, Class<?>> field : testFields) {
+			rowFields.add(new RowType.RowField(field.f0, field.f1));
+		}
+		rowType = new RowType(rowFields);
+
+		allocator = ArrowUtils.ROOT_ALLOCATOR.newChildAllocator(
+			"stdout", 0, Long.MAX_VALUE);
+	}
+
+	@Test
+	public void testConvertBetweenLogicalTypeAndArrowType() {
+		Schema schema = ArrowUtils.toArrowSchema(rowType);
+
+		assertEquals(testFields.size(), schema.getFields().size());
+		List<Field> fields = schema.getFields();
+		for (int i = 0; i < schema.getFields().size(); i++) {
+			// verify convert from RowType to ArrowType
+			assertEquals(testFields.get(i).f0, fields.get(i).getName());
+			assertEquals(testFields.get(i).f2, fields.get(i).getType());
+			// verify convert from ArrowType to LogicalType
+			assertEquals(testFields.get(i).f1, ArrowUtils.fromArrowField(fields.get(i)));
+		}
+	}
+
+	@Test
+	public void testCreateArrowReader() {
+		VectorSchemaRoot root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
+		RowArrowReader reader = ArrowUtils.createArrowReader(root);
+		ArrowFieldReader[] fieldReaders = reader.getFieldReaders();
+		for (int i = 0; i < fieldReaders.length; i++) {
+			assertEquals(testFields.get(i).f5, fieldReaders[i].getClass());
+		}
+	}
+
+	@Test
+	public void testCreateBlinkArrowReader() {
 
 Review comment:
   testCreateBaseRowArrowReader

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#discussion_r382413407
 
 

 ##########
 File path: flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java
 ##########
 @@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow;
+
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.vector.ColumnVector;
+import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.IntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader;
+import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowSmallIntColumnVector;
+import org.apache.flink.table.runtime.arrow.vectors.ArrowTinyIntColumnVector;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowBigIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowSmallIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BaseRowTinyIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.BigIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.IntWriter;
+import org.apache.flink.table.runtime.arrow.writers.SmallIntWriter;
+import org.apache.flink.table.runtime.arrow.writers.TinyIntWriter;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.types.Row;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ArrowUtils}.
+ */
+public class ArrowUtilsTest {
+
+	private static List<Tuple7<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>, Class<?>>> testFields;
+	private static RowType rowType;
+	private static BufferAllocator allocator;
+
+	@BeforeClass
+	public static void init() {
+		testFields = new ArrayList<>();
+		testFields.add(Tuple7.of(
+			"f1", new TinyIntType(), new ArrowType.Int(8, true), TinyIntWriter.class,
+			BaseRowTinyIntWriter.class, TinyIntFieldReader.class, ArrowTinyIntColumnVector.class));
+		testFields.add(Tuple7.of("f2", new SmallIntType(), new ArrowType.Int(8 * 2, true),
+			SmallIntWriter.class, BaseRowSmallIntWriter.class, SmallIntFieldReader.class, ArrowSmallIntColumnVector.class));
+		testFields.add(Tuple7.of("f3", new IntType(), new ArrowType.Int(8 * 4, true),
+			IntWriter.class, BaseRowIntWriter.class, IntFieldReader.class, ArrowIntColumnVector.class));
+		testFields.add(Tuple7.of("f4", new BigIntType(), new ArrowType.Int(8 * 8, true),
+			BigIntWriter.class, BaseRowBigIntWriter.class, BigIntFieldReader.class, ArrowBigIntColumnVector.class));
+
+		List<RowType.RowField> rowFields = new ArrayList<>();
+		for (Tuple7<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>, Class<?>> field : testFields) {
+			rowFields.add(new RowType.RowField(field.f0, field.f1));
+		}
+		rowType = new RowType(rowFields);
+
+		allocator = ArrowUtils.ROOT_ALLOCATOR.newChildAllocator(
+			"stdout", 0, Long.MAX_VALUE);
+	}
+
+	@Test
+	public void testConvertBetweenLogicalTypeAndArrowType() {
+		Schema schema = ArrowUtils.toArrowSchema(rowType);
+
+		assertEquals(testFields.size(), schema.getFields().size());
+		List<Field> fields = schema.getFields();
+		for (int i = 0; i < schema.getFields().size(); i++) {
+			// verify convert from RowType to ArrowType
+			assertEquals(testFields.get(i).f0, fields.get(i).getName());
+			assertEquals(testFields.get(i).f2, fields.get(i).getType());
+			// verify convert from ArrowType to LogicalType
+			assertEquals(testFields.get(i).f1, ArrowUtils.fromArrowField(fields.get(i)));
+		}
+	}
+
+	@Test
+	public void testCreateArrowReader() {
 
 Review comment:
   testCreateRowArrowReader

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#issuecomment-587025097
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit dba261b36b56aedec766097042dbe40858e5fc8c (Mon Feb 17 14:40:47 UTC 2020)
   
   **Warnings:**
    * **2 pom.xml files were touched**: Check for build and licensing issues.
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#discussion_r382371474
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowFieldReader.java
 ##########
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow;
 
 Review comment:
   According to the previous discussion, it is suggested to add python in the package path. How about renaming the package path to `org.apache.flink.table.runtime.python.arrow`?
   
   It seems we can also do some refactor if we add a python module under runtime, i.e., all python modules under functions, operations, runners can be removed. For example, we can change `org.apache.flink.table.runtime.functions.python` to `org.apache.flink.table.runtime.python.functions`.
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#discussion_r382382416
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowFieldWriter.java
 ##########
 @@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.arrow.vector.ValueVector;
+
+/**
+ * Base class for arrow field writer.
 
 Review comment:
   Base class for arrow field writer which is used to convert a field to an Arrow field?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#issuecomment-587035354
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149292329",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dba261b36b56aedec766097042dbe40858e5fc8c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149292329) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#issuecomment-587035354
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149292329",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150247518",
       "triggerID" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dba261b36b56aedec766097042dbe40858e5fc8c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149292329) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252) 
   * fc594cbe4670795421281fd5d19cd5b3d8109a9e Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150247518) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#issuecomment-587035354
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149292329",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150247518",
       "triggerID" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5495",
       "triggerID" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c973405c3d613b89b42fa27430d1eb53f8acf30",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150257048",
       "triggerID" : "5c973405c3d613b89b42fa27430d1eb53f8acf30",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fc594cbe4670795421281fd5d19cd5b3d8109a9e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150247518) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5495) 
   * 5c973405c3d613b89b42fa27430d1eb53f8acf30 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150257048) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#issuecomment-587035354
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149292329",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dba261b36b56aedec766097042dbe40858e5fc8c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149292329) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252) 
   * fc594cbe4670795421281fd5d19cd5b3d8109a9e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dianfu commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#discussion_r383100703
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowFieldReader.java
 ##########
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow;
 
 Review comment:
   I recall that we have reached consensus that we should use functions.python instead of python.functions. However, I'm also not sure which one is best. What about discussing whether we should improve the package name in a separate thread? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#issuecomment-587035354
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149292329",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5252",
       "triggerID" : "dba261b36b56aedec766097042dbe40858e5fc8c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150247518",
       "triggerID" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5495",
       "triggerID" : "fc594cbe4670795421281fd5d19cd5b3d8109a9e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c973405c3d613b89b42fa27430d1eb53f8acf30",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150257048",
       "triggerID" : "5c973405c3d613b89b42fa27430d1eb53f8acf30",
       "triggerType" : "PUSH"
     }, {
       "hash" : "059a6c132696e9f7dec67133ecbbf5e281c0debf",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150266372",
       "triggerID" : "059a6c132696e9f7dec67133ecbbf5e281c0debf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5c973405c3d613b89b42fa27430d1eb53f8acf30 Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/150257048) 
   * 059a6c132696e9f7dec67133ecbbf5e281c0debf Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150266372) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11112: [FLINK-16121][python] Introduce ArrowReader and ArrowWriter for Arrow format data read and write
URL: https://github.com/apache/flink/pull/11112#discussion_r382412615
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/vectors/ArrowBigIntColumnVector.java
 ##########
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow.vectors;
+
+import org.apache.flink.table.dataformat.vector.LongColumnVector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.arrow.vector.BigIntVector;
+
+/**
+ * Arrow column vector for BigInt.
+ */
+public final class ArrowBigIntColumnVector implements LongColumnVector {
 
 Review comment:
   Add @Internal. Same for other ColumnVectors.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services