You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/08/08 19:45:32 UTC
[arrow] branch master updated: ARROW-17303: [Java][Dataset] Read Arrow IPC files by NativeDatasetFactory (#13760) (#13811)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 78351cec5a ARROW-17303: [Java][Dataset] Read Arrow IPC files by NativeDatasetFactory (#13760) (#13811)
78351cec5a is described below
commit 78351cec5afe97f94050118e0fdeaa14f385178c
Author: Igor Suhorukov <ig...@gmail.com>
AuthorDate: Mon Aug 8 22:45:28 2022 +0300
ARROW-17303: [Java][Dataset] Read Arrow IPC files by NativeDatasetFactory (#13760) (#13811)
This PR allow developers to create Dataset from ARROW IPC files in JVM code like:
`FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.ARROW_IPC, arrowDatasetURL);`
It is foundation for Apache Spark arrow data source to process huge existing partitioned datasets in ARROW file format without additional data format conversion
Lead-authored-by: Igor Suhorukov <ig...@gmail.com>
Co-authored-by: igor.suhorukov <ig...@gmail.com>
Signed-off-by: David Li <li...@gmail.com>
---
java/dataset/src/main/cpp/jni_wrapper.cc | 2 ++
.../org/apache/arrow/dataset/file/FileFormat.java | 1 +
.../arrow/dataset/file/TestFileSystemDataset.java | 41 ++++++++++++++++++++++
3 files changed, 44 insertions(+)
diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc
index e96dfb8aed..d088163903 100644
--- a/java/dataset/src/main/cpp/jni_wrapper.cc
+++ b/java/dataset/src/main/cpp/jni_wrapper.cc
@@ -89,6 +89,8 @@ arrow::Result<std::shared_ptr<arrow::dataset::FileFormat>> GetFileFormat(
switch (file_format_id) {
case 0:
return std::make_shared<arrow::dataset::ParquetFileFormat>();
+ case 1:
+ return std::make_shared<arrow::dataset::IpcFileFormat>();
default:
std::string error_message =
"illegal file format id: " + std::to_string(file_format_id);
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
index 107fc2f71d..343e458ce2 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
@@ -22,6 +22,7 @@ package org.apache.arrow.dataset.file;
*/
public enum FileFormat {
PARQUET(0),
+ ARROW_IPC(1),
NONE(-1);
private final int id;
diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
index 92610b1145..2fd8a19bac 100644
--- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
@@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -46,11 +47,15 @@ import org.apache.arrow.dataset.jni.TestNativeDataset;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
@@ -316,6 +321,42 @@ public class TestFileSystemDataset extends TestNativeDataset {
AutoCloseables.close(factory);
}
+ @Test
+ public void testBaseArrowIpcRead() throws Exception {
+ File dataFile = TMP.newFile();
+ Schema sourceSchema = new Schema(Collections.singletonList(Field.nullable("ints", new ArrowType.Int(32, true))));
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(sourceSchema, rootAllocator());
+ FileOutputStream sink = new FileOutputStream(dataFile);
+ ArrowFileWriter writer = new ArrowFileWriter(root, /*dictionaryProvider=*/null, sink.getChannel())) {
+ IntVector ints = (IntVector) root.getVector(0);
+ ints.setSafe(0, 0);
+ ints.setSafe(1, 1024);
+ ints.setSafe(2, Integer.MAX_VALUE);
+ root.setRowCount(3);
+ writer.start();
+ writer.writeBatch();
+ writer.end();
+ }
+
+ String arrowDataURI = dataFile.toURI().toString();
+ FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.ARROW_IPC, arrowDataURI);
+ ScanOptions options = new ScanOptions(100);
+ Schema schema = inferResultSchemaFromFactory(factory, options);
+ List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
+
+ assertSingleTaskProduced(factory, options);
+ assertEquals(1, datum.size());
+ assertEquals(1, schema.getFields().size());
+ assertEquals("ints", schema.getFields().get(0).getName());
+
+ String expectedJsonUnordered = String.format("[[0],[1024],[%d]]", Integer.MAX_VALUE);
+ checkParquetReadResult(schema, expectedJsonUnordered, datum);
+
+ AutoCloseables.close(datum);
+ AutoCloseables.close(factory);
+ }
+
private void checkParquetReadResult(Schema schema, String expectedJson, List<ArrowRecordBatch> actual)
throws IOException {
final ObjectMapper json = new ObjectMapper();