You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/08/08 19:45:32 UTC

[arrow] branch master updated: ARROW-17303: [Java][Dataset] Read Arrow IPC files by NativeDatasetFactory (#13760) (#13811)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 78351cec5a ARROW-17303: [Java][Dataset] Read Arrow IPC files by NativeDatasetFactory (#13760) (#13811)
78351cec5a is described below

commit 78351cec5afe97f94050118e0fdeaa14f385178c
Author: Igor Suhorukov <ig...@gmail.com>
AuthorDate: Mon Aug 8 22:45:28 2022 +0300

    ARROW-17303: [Java][Dataset] Read Arrow IPC files by NativeDatasetFactory (#13760) (#13811)
    
    This PR allow developers to create Dataset from ARROW IPC files in JVM code like:
    `FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
                FileFormat.ARROW_IPC, arrowDatasetURL);`
    
    It is foundation for Apache Spark arrow data source to process huge existing partitioned datasets in ARROW file format without additional data format conversion
    
    Lead-authored-by: Igor Suhorukov <ig...@gmail.com>
    Co-authored-by: igor.suhorukov <ig...@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 java/dataset/src/main/cpp/jni_wrapper.cc           |  2 ++
 .../org/apache/arrow/dataset/file/FileFormat.java  |  1 +
 .../arrow/dataset/file/TestFileSystemDataset.java  | 41 ++++++++++++++++++++++
 3 files changed, 44 insertions(+)

diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc
index e96dfb8aed..d088163903 100644
--- a/java/dataset/src/main/cpp/jni_wrapper.cc
+++ b/java/dataset/src/main/cpp/jni_wrapper.cc
@@ -89,6 +89,8 @@ arrow::Result<std::shared_ptr<arrow::dataset::FileFormat>> GetFileFormat(
   switch (file_format_id) {
     case 0:
       return std::make_shared<arrow::dataset::ParquetFileFormat>();
+    case 1:
+      return std::make_shared<arrow::dataset::IpcFileFormat>();
     default:
       std::string error_message =
           "illegal file format id: " + std::to_string(file_format_id);
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
index 107fc2f71d..343e458ce2 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
@@ -22,6 +22,7 @@ package org.apache.arrow.dataset.file;
  */
 public enum FileFormat {
   PARQUET(0),
+  ARROW_IPC(1),
   NONE(-1);
 
   private final int id;
diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
index 92610b1145..2fd8a19bac 100644
--- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
@@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -46,11 +47,15 @@ import org.apache.arrow.dataset.jni.TestNativeDataset;
 import org.apache.arrow.dataset.scanner.ScanOptions;
 import org.apache.arrow.util.AutoCloseables;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
 import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
@@ -316,6 +321,42 @@ public class TestFileSystemDataset extends TestNativeDataset {
     AutoCloseables.close(factory);
   }
 
+  @Test
+  public void testBaseArrowIpcRead() throws Exception {
+    File dataFile = TMP.newFile();
+    Schema sourceSchema = new Schema(Collections.singletonList(Field.nullable("ints", new ArrowType.Int(32, true))));
+    try (VectorSchemaRoot root = VectorSchemaRoot.create(sourceSchema, rootAllocator());
+         FileOutputStream sink = new FileOutputStream(dataFile);
+         ArrowFileWriter writer = new ArrowFileWriter(root, /*dictionaryProvider=*/null, sink.getChannel())) {
+      IntVector ints = (IntVector) root.getVector(0);
+      ints.setSafe(0, 0);
+      ints.setSafe(1, 1024);
+      ints.setSafe(2, Integer.MAX_VALUE);
+      root.setRowCount(3);
+      writer.start();
+      writer.writeBatch();
+      writer.end();
+    }
+
+    String arrowDataURI = dataFile.toURI().toString();
+    FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.ARROW_IPC, arrowDataURI);
+    ScanOptions options = new ScanOptions(100);
+    Schema schema = inferResultSchemaFromFactory(factory, options);
+    List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
+
+    assertSingleTaskProduced(factory, options);
+    assertEquals(1, datum.size());
+    assertEquals(1, schema.getFields().size());
+    assertEquals("ints", schema.getFields().get(0).getName());
+
+    String expectedJsonUnordered = String.format("[[0],[1024],[%d]]", Integer.MAX_VALUE);
+    checkParquetReadResult(schema, expectedJsonUnordered, datum);
+
+    AutoCloseables.close(datum);
+    AutoCloseables.close(factory);
+  }
+
   private void checkParquetReadResult(Schema schema, String expectedJson, List<ArrowRecordBatch> actual)
       throws IOException {
     final ObjectMapper json = new ObjectMapper();