You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/19 12:38:38 UTC

[GitHub] [arrow] lidavidm commented on a diff in pull request #14151: ARROW-11776: [C++][Java] Support parquet write from ArrowReader to file

lidavidm commented on code in PR #14151:
URL: https://github.com/apache/arrow/pull/14151#discussion_r999338824


##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -29,6 +31,8 @@
 #include "org_apache_arrow_dataset_jni_JniWrapper.h"
 #include "org_apache_arrow_dataset_jni_NativeMemoryPool.h"
 
+#include <iostream>

Review Comment:
   We shouldn't need iostream?



##########
java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.file;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.dataset.scanner.ArrowScannerReader;
+import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.util.SchemaUtility;
+
+import java.util.Iterator;
+
+/**
+ * JNI-based utility to write datasets into files. It internally depends on C++ static method
+ * FileSystemDataset::Write.
+ */
+public class DatasetFileWriter {
+
+  /**
+   * Scan over an input {@link Scanner} then write all record batches to file.

Review Comment:
   Docstring needs updating



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -510,3 +526,55 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
   return CreateNativeRef(d);
   JNI_METHOD_END(-1L)
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_file_JniWrapper
+ * Method:    writeFromScannerToFile
+ * Signature:
+ * (JJJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V
+ */
+JNIEXPORT void JNICALL
+Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
+    JNIEnv* env, jobject, jlong c_arrow_array_stream_address,
+    jlong file_format_id, jstring uri, jobjectArray partition_columns,
+    jint max_partitions, jstring base_name_template) {
+  JNI_METHOD_START
+  JavaVM* vm;
+  if (env->GetJavaVM(&vm) != JNI_OK) {
+    JniThrow("Unable to get JavaVM instance");
+  }
+
+  auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address);
+  struct ArrowSchema c_schema;
+  arrow_stream->get_schema(arrow_stream, &c_schema);
+
+  std::shared_ptr<arrow::RecordBatchReader> reader =
+      JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream));
+  // Release the ArrowArrayStream
+  ArrowArrayStreamRelease(arrow_stream);

Review Comment:
   ImportRecordBatchReader calls ArrowArrayStreamMove, so this is not necessary



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -510,3 +526,55 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
   return CreateNativeRef(d);
   JNI_METHOD_END(-1L)
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_file_JniWrapper
+ * Method:    writeFromScannerToFile
+ * Signature:
+ * (JJJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V
+ */
+JNIEXPORT void JNICALL
+Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
+    JNIEnv* env, jobject, jlong c_arrow_array_stream_address,
+    jlong file_format_id, jstring uri, jobjectArray partition_columns,
+    jint max_partitions, jstring base_name_template) {
+  JNI_METHOD_START
+  JavaVM* vm;
+  if (env->GetJavaVM(&vm) != JNI_OK) {
+    JniThrow("Unable to get JavaVM instance");
+  }
+
+  auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address);
+  struct ArrowSchema c_schema;
+  arrow_stream->get_schema(arrow_stream, &c_schema);

Review Comment:
   Once you import the reader, you can use `reader->schema()`.



##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.scanner;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.CDataDictionaryProvider;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class ArrowScannerReader extends ArrowReader {
+  private final Scanner scanner;
+
+  private Iterator<? extends ScanTask> taskIterator;
+
+  private ScanTask currentTask = null;
+  private ArrowReader currentReader = null;
+
+  public ArrowScannerReader(Scanner scanner, BufferAllocator allocator) {
+    super(allocator);
+    this.scanner = scanner;
+    this.taskIterator = scanner.scan().iterator();
+    if (taskIterator.hasNext()) {
+      currentTask = taskIterator.next();
+      currentReader = currentTask.execute();
+    } else {
+      currentReader = null;
+    }
+  }
+
+  @Override
+  protected void loadRecordBatch(ArrowRecordBatch batch) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean loadNextBatch() throws IOException {
+    if (currentReader == null) return false;
+    Boolean result = currentReader.loadNextBatch();

Review Comment:
   why Boolean and not boolean?



##########
java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.file;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.dataset.scanner.ArrowScannerReader;
+import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.util.SchemaUtility;
+
+import java.util.Iterator;
+
+/**
+ * JNI-based utility to write datasets into files. It internally depends on C++ static method
+ * FileSystemDataset::Write.
+ */
+public class DatasetFileWriter {
+
+  /**
+   * Scan over an input {@link Scanner} then write all record batches to file.
+   *
+   * @param reader the datasource for writing
+   * @param format target file format
+   * @param uri target file uri
+   * @param maxPartitions maximum partitions to be included in written files
+   * @param partitionColumns columns used to partition output files. Empty to disable partitioning
+   * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition
+   *                         ID around all written files.
+   */
+  public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri,
+                           String[] partitionColumns, int maxPartitions, String baseNameTemplate) {
+    RuntimeException throwableWrapper = null;
+    try {
+      ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator);
+      Data.exportArrayStream(allocator, reader, stream);
+      JniWrapper.get().writeFromScannerToFile(stream.memoryAddress(),
+          format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate);
+      stream.close();
+    } catch (Throwable t) {
+      throwableWrapper = new RuntimeException(t);

Review Comment:
   nit: why are we declaring the exception separately?



##########
java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.file;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.dataset.scanner.ArrowScannerReader;
+import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.util.SchemaUtility;
+
+import java.util.Iterator;
+
+/**
+ * JNI-based utility to write datasets into files. It internally depends on C++ static method
+ * FileSystemDataset::Write.
+ */
+public class DatasetFileWriter {
+
+  /**
+   * Scan over an input {@link Scanner} then write all record batches to file.
+   *
+   * @param reader the datasource for writing
+   * @param format target file format
+   * @param uri target file uri
+   * @param maxPartitions maximum partitions to be included in written files
+   * @param partitionColumns columns used to partition output files. Empty to disable partitioning
+   * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition
+   *                         ID around all written files.
+   */
+  public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri,
+                           String[] partitionColumns, int maxPartitions, String baseNameTemplate) {
+    RuntimeException throwableWrapper = null;
+    try {
+      ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator);

Review Comment:
   use try-with-resources



##########
java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.file;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.dataset.ParquetWriteSupport;
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ArrowScannerReader;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.WriteChannel;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestDatasetFileWriter extends TestDataset {
+
+  @ClassRule
+  public static final TemporaryFolder TMP = new TemporaryFolder();
+
+  public static final String AVRO_SCHEMA_USER = "user.avsc";
+
+  @Test
+  public void testParquetWriteSimple() throws Exception {
+    ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(),
+        1, "a", 2, "b", 3, "c", 2, "d");
+    String sampleParquet = writeSupport.getOutputURI();
+    FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+        FileFormat.PARQUET, sampleParquet);
+    ScanOptions options = new ScanOptions(new String[0], 100);
+    final Dataset dataset = factory.finish();
+    final Scanner scanner = dataset.newScan(options);
+    final ArrowScannerReader reader = new ArrowScannerReader(scanner, rootAllocator());
+    final File writtenFolder = TMP.newFolder();
+    final String writtenParquet = writtenFolder.toURI().toString();
+    try {
+      DatasetFileWriter.write(rootAllocator(), reader, FileFormat.PARQUET, writtenParquet);
+      assertParquetFileEquals(sampleParquet, Objects.requireNonNull(writtenFolder.listFiles())[0].toURI().toString());
+    } finally {
+      AutoCloseables.close(factory, scanner, reader, dataset);

Review Comment:
   Use try-with-resources for everything.



##########
java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.file;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.dataset.scanner.ArrowScannerReader;
+import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.util.SchemaUtility;
+
+import java.util.Iterator;
+
+/**
+ * JNI-based utility to write datasets into files. It internally depends on C++ static method
+ * FileSystemDataset::Write.
+ */
+public class DatasetFileWriter {
+
+  /**
+   * Scan over an input {@link Scanner} then write all record batches to file.
+   *
+   * @param reader the datasource for writing
+   * @param format target file format
+   * @param uri target file uri
+   * @param maxPartitions maximum partitions to be included in written files
+   * @param partitionColumns columns used to partition output files. Empty to disable partitioning
+   * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition
+   *                         ID around all written files.
+   */
+  public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri,
+                           String[] partitionColumns, int maxPartitions, String baseNameTemplate) {
+    RuntimeException throwableWrapper = null;
+    try {
+      ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator);
+      Data.exportArrayStream(allocator, reader, stream);
+      JniWrapper.get().writeFromScannerToFile(stream.memoryAddress(),
+          format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate);
+      stream.close();
+    } catch (Throwable t) {
+      throwableWrapper = new RuntimeException(t);
+      throw throwableWrapper;
+    }
+  }
+
+  /**
+   * Scan over an input {@link Scanner} then write all record batches to file, with default partitioning settings.
+   *
+   * @param reader the datasource for writing
+   * @param format target file format
+   * @param uri target file uri
+   */
+  public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri) {
+    write(allocator, reader, format, uri, new String[0], 1024, "dat_{i}");

Review Comment:
   "data_{i}"?



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -170,6 +174,19 @@ class DisposableScannerAdaptor {
   }
 };
 
+std::shared_ptr<arrow::Schema> SchemaFromColumnNames(
+    const std::shared_ptr<arrow::Schema>& input,
+    const std::vector<std::string>& column_names) {
+  std::vector<std::shared_ptr<arrow::Field>> columns;
+  for (arrow::FieldRef ref : column_names) {
+    auto maybe_field = ref.GetOne(*input);
+    if (maybe_field.ok()) {
+      columns.push_back(std::move(maybe_field).ValueOrDie());
+    }

Review Comment:
   We should error if a column name isn't present?



##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.scanner;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.CDataDictionaryProvider;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class ArrowScannerReader extends ArrowReader {
+  private final Scanner scanner;
+
+  private Iterator<? extends ScanTask> taskIterator;
+
+  private ScanTask currentTask = null;
+  private ArrowReader currentReader = null;
+
+  public ArrowScannerReader(Scanner scanner, BufferAllocator allocator) {
+    super(allocator);
+    this.scanner = scanner;
+    this.taskIterator = scanner.scan().iterator();
+    if (taskIterator.hasNext()) {
+      currentTask = taskIterator.next();
+      currentReader = currentTask.execute();
+    } else {
+      currentReader = null;

Review Comment:
   this branch is redundant



##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.scanner;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.CDataDictionaryProvider;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class ArrowScannerReader extends ArrowReader {
+  private final Scanner scanner;
+
+  private Iterator<? extends ScanTask> taskIterator;
+
+  private ScanTask currentTask = null;
+  private ArrowReader currentReader = null;
+
+  public ArrowScannerReader(Scanner scanner, BufferAllocator allocator) {
+    super(allocator);
+    this.scanner = scanner;
+    this.taskIterator = scanner.scan().iterator();
+    if (taskIterator.hasNext()) {
+      currentTask = taskIterator.next();
+      currentReader = currentTask.execute();
+    } else {
+      currentReader = null;
+    }
+  }
+
+  @Override
+  protected void loadRecordBatch(ArrowRecordBatch batch) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean loadNextBatch() throws IOException {
+    if (currentReader == null) return false;
+    Boolean result = currentReader.loadNextBatch();
+
+    if (!result) {
+      try {
+        currentTask.close();
+        currentReader.close();
+      } catch (Exception e) {
+        throw new RuntimeException(e);

Review Comment:
   IOException?



##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.scanner;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.CDataDictionaryProvider;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class ArrowScannerReader extends ArrowReader {
+  private final Scanner scanner;
+
+  private Iterator<? extends ScanTask> taskIterator;
+
+  private ScanTask currentTask = null;
+  private ArrowReader currentReader = null;
+
+  public ArrowScannerReader(Scanner scanner, BufferAllocator allocator) {
+    super(allocator);
+    this.scanner = scanner;
+    this.taskIterator = scanner.scan().iterator();
+    if (taskIterator.hasNext()) {
+      currentTask = taskIterator.next();
+      currentReader = currentTask.execute();
+    } else {
+      currentReader = null;
+    }
+  }
+
+  @Override
+  protected void loadRecordBatch(ArrowRecordBatch batch) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean loadNextBatch() throws IOException {
+    if (currentReader == null) return false;
+    Boolean result = currentReader.loadNextBatch();
+
+    if (!result) {
+      try {
+        currentTask.close();
+        currentReader.close();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+
+      while (!result) {
+        if (!taskIterator.hasNext()) {
+          return false;
+        } else {
+          currentTask = taskIterator.next();
+          currentReader = currentTask.execute();
+          result = currentReader.loadNextBatch();
+        }
+      }
+    }
+
+    // Load the currentReader#VectorSchemaRoot to ArrowArray
+    VectorSchemaRoot vsr = currentReader.getVectorSchemaRoot();
+    ArrowArray array = ArrowArray.allocateNew(allocator);
+    Data.exportVectorSchemaRoot(allocator, vsr, currentReader, array);
+
+    // Load the ArrowArray into ArrowScannerReader#VectorSchemaRoot
+    CDataDictionaryProvider provider = new CDataDictionaryProvider();
+    Data.importIntoVectorSchemaRoot(allocator,
+        array, this.getVectorSchemaRoot(), provider);
+    array.close();
+    provider.close();

Review Comment:
   Use VectorLoader/VectorUnloader.



##########
java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.scanner;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.CDataDictionaryProvider;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class ArrowScannerReader extends ArrowReader {
+  private final Scanner scanner;
+
+  private Iterator<? extends ScanTask> taskIterator;
+
+  private ScanTask currentTask = null;
+  private ArrowReader currentReader = null;
+
+  public ArrowScannerReader(Scanner scanner, BufferAllocator allocator) {
+    super(allocator);
+    this.scanner = scanner;
+    this.taskIterator = scanner.scan().iterator();
+    if (taskIterator.hasNext()) {
+      currentTask = taskIterator.next();
+      currentReader = currentTask.execute();
+    } else {
+      currentReader = null;
+    }
+  }
+
+  @Override
+  protected void loadRecordBatch(ArrowRecordBatch batch) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean loadNextBatch() throws IOException {
+    if (currentReader == null) return false;
+    Boolean result = currentReader.loadNextBatch();
+
+    if (!result) {
+      try {
+        currentTask.close();
+        currentReader.close();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+
+      while (!result) {
+        if (!taskIterator.hasNext()) {
+          return false;
+        } else {
+          currentTask = taskIterator.next();
+          currentReader = currentTask.execute();
+          result = currentReader.loadNextBatch();
+        }
+      }
+    }
+
+    // Load the currentReader#VectorSchemaRoot to ArrowArray
+    VectorSchemaRoot vsr = currentReader.getVectorSchemaRoot();
+    ArrowArray array = ArrowArray.allocateNew(allocator);
+    Data.exportVectorSchemaRoot(allocator, vsr, currentReader, array);
+
+    // Load the ArrowArray into ArrowScannerReader#VectorSchemaRoot
+    CDataDictionaryProvider provider = new CDataDictionaryProvider();
+    Data.importIntoVectorSchemaRoot(allocator,
+        array, this.getVectorSchemaRoot(), provider);
+    array.close();
+    provider.close();
+    return true;
+  }
+
+  @Override
+  public long bytesRead() {
+    return 0L;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      super.close(true);
+      currentTask.close();
+      currentReader.close();
+      scanner.close();

Review Comment:
   the base close() calls closeReadSource() so these could just be put there instead



##########
java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.file;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.dataset.ParquetWriteSupport;
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ArrowScannerReader;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.WriteChannel;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestDatasetFileWriter extends TestDataset {
+
+  @ClassRule
+  public static final TemporaryFolder TMP = new TemporaryFolder();
+
+  public static final String AVRO_SCHEMA_USER = "user.avsc";
+
+  @Test
+  public void testParquetWriteSimple() throws Exception {
+    ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(),
+        1, "a", 2, "b", 3, "c", 2, "d");
+    String sampleParquet = writeSupport.getOutputURI();
+    FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+        FileFormat.PARQUET, sampleParquet);
+    ScanOptions options = new ScanOptions(new String[0], 100);
+    final Dataset dataset = factory.finish();
+    final Scanner scanner = dataset.newScan(options);
+    final ArrowScannerReader reader = new ArrowScannerReader(scanner, rootAllocator());
+    final File writtenFolder = TMP.newFolder();
+    final String writtenParquet = writtenFolder.toURI().toString();
+    try {
+      DatasetFileWriter.write(rootAllocator(), reader, FileFormat.PARQUET, writtenParquet);
+      assertParquetFileEquals(sampleParquet, Objects.requireNonNull(writtenFolder.listFiles())[0].toURI().toString());
+    } finally {
+      AutoCloseables.close(factory, scanner, reader, dataset);
+    }
+  }
+
+  @Test
+  public void testParquetWriteWithPartitions() throws Exception {
+    ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(),
+        1, "a", 2, "b", 3, "c", 2, "d");
+    String sampleParquet = writeSupport.getOutputURI();
+    FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+        FileFormat.PARQUET, sampleParquet);
+    ScanOptions options = new ScanOptions(new String[0], 100);
+    final Dataset dataset = factory.finish();
+    final Scanner scanner = dataset.newScan(options);
+    final ArrowScannerReader reader = new ArrowScannerReader(scanner, rootAllocator());
+    final File writtenFolder = TMP.newFolder();
+    final String writtenParquet = writtenFolder.toURI().toString();
+    try {
+      DatasetFileWriter.write(rootAllocator(), reader, FileFormat.PARQUET, writtenParquet, new String[]{"id", "name"}, 100, "dat_{i}");
+      final Set<String> expectedOutputFiles = new HashSet<>(
+          Arrays.asList("id=1/name=a/dat_0", "id=2/name=b/dat_0", "id=3/name=c/dat_0", "id=2/name=d/dat_0"));
+      final Set<String> outputFiles = FileUtils.listFiles(writtenFolder, null, true)
+          .stream()
+          .map(file -> {
+            return writtenFolder.toURI().relativize(file.toURI()).toString();
+          })
+          .collect(Collectors.toSet());
+      Assert.assertEquals(expectedOutputFiles, outputFiles);
+    } finally {
+      AutoCloseables.close(factory, scanner, reader, dataset);
+    }
+  }
+
+  private void assertParquetFileEquals(String expectedURI, String actualURI) throws Exception {
+    final FileSystemDatasetFactory expectedFactory = new FileSystemDatasetFactory(
+        rootAllocator(), NativeMemoryPool.getDefault(), FileFormat.PARQUET, expectedURI);
+    List<ArrowRecordBatch> expectedBatches = collectResultFromFactory(expectedFactory,
+        new ScanOptions(new String[0], 100));
+    final FileSystemDatasetFactory actualFactory = new FileSystemDatasetFactory(
+        rootAllocator(), NativeMemoryPool.getDefault(), FileFormat.PARQUET, actualURI);
+    List<ArrowRecordBatch> actualBatches = collectResultFromFactory(actualFactory,
+        new ScanOptions(new String[0], 100));
+    // fast-fail by comparing metadata
+    Assert.assertEquals(expectedBatches.toString(), actualBatches.toString());
+    // compare buffers
+    Assert.assertEquals(serialize(expectedBatches), serialize(actualBatches));

Review Comment:
   I'm not sure comparing the serialized forms is expected to be stable. VectorSchemaRoot has an equals, how do other tests do this?



##########
java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.file;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.dataset.scanner.ArrowScannerReader;
+import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.util.SchemaUtility;
+
+import java.util.Iterator;
+
+/**
+ * JNI-based utility to write datasets into files. It internally depends on C++ static method
+ * FileSystemDataset::Write.
+ */
+public class DatasetFileWriter {
+
+  /**
+   * Scan over an input {@link Scanner} then write all record batches to file.

Review Comment:
   An overload accepting Scanner may make sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org