You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/06/19 10:50:41 UTC

[1/3] drill git commit: DRILL-5589: JDBC client crashes after successful authentication if trace logging is enabled

Repository: drill
Updated Branches:
  refs/heads/master a7e298760 -> be43a9edd


DRILL-5589: JDBC client crashes after successful authentication if trace logging is enabled

closes #854


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/01bcb787
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/01bcb787
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/01bcb787

Branch: refs/heads/master
Commit: 01bcb787493b218f80a8e66305d2e272ee004e1d
Parents: a7e2987
Author: Sorabh Hamirwasia <sh...@maprtech.com>
Authored: Thu Jun 15 11:00:21 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Mon Jun 19 12:18:42 2017 +0300

----------------------------------------------------------------------
 .../drill/exec/rpc/security/AuthenticationOutcomeListener.java | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/01bcb787/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
index 7f51142..5c34d01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
@@ -121,6 +121,8 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
       completionListener.failed(RpcException.mapException(
           new SaslException("Server sent a corrupt message.")));
     } else {
+      // SaslSuccessProcessor.process disposes saslClient so get mechanism here to use later in logging
+      final String mechanism = connection.getSaslClient().getMechanismName();
       try {
         final SaslChallengeContext<C> context = new SaslChallengeContext<>(value, ugi, connection);
         final SaslMessage saslResponse = processor.process(context);
@@ -134,12 +136,12 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
           completionListener.success(null, null);
           if (logger.isTraceEnabled()) {
             logger.trace("Successfully authenticated to server using {} mechanism and encryption context: {}",
-                connection.getSaslClient().getMechanismName(), connection.getEncryptionCtxtString());
+                mechanism, connection.getEncryptionCtxtString());
           }
         }
       } catch (final Exception e) {
         logger.error("Authentication with encryption context: {} using mechanism {} failed with {}",
-            connection.getEncryptionCtxtString(), connection.getSaslClient().getMechanismName(), e.getMessage());
+            connection.getEncryptionCtxtString(), mechanism, e.getMessage());
         completionListener.failed(RpcException.mapException(e));
       }
     }


[2/3] drill git commit: DRILL-5544: Out of heap running CTAS against text delimited

Posted by ar...@apache.org.
DRILL-5544: Out of heap running CTAS against text delimited

Since parquet version of PageWriter cannot allow to use direct memory for allocating ByteBuffers.
This PR introduces other version of PageWriter and PageWriteStore.
See more: https://issues.apache.org/jira/browse/PARQUET-1006.

closes #846


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b714b2d7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b714b2d7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b714b2d7

Branch: refs/heads/master
Commit: b714b2d74f80755d02de87b3151d94cb9cfc6794
Parents: 01bcb78
Author: Vitalii Diravka <vi...@gmail.com>
Authored: Thu May 25 17:10:55 2017 +0000
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Mon Jun 19 12:19:55 2017 +0300

----------------------------------------------------------------------
 .../exec/store/parquet/ParquetRecordWriter.java |  61 ++--
 .../ColumnChunkPageWriteStoreExposer.java       |  47 ----
 .../ParquetColumnChunkPageWriteStore.java       | 280 +++++++++++++++++++
 .../physical/impl/writer/TestParquetWriter.java |   4 +-
 4 files changed, 318 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b714b2d7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index bc495a3..1d4d161 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -50,12 +50,12 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
-import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.ColumnChunkPageWriteStoreExposer;
+import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.io.ColumnIOFactory;
@@ -100,7 +100,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
 
   private ColumnWriteStore store;
-  private PageWriteStore pageStore;
+  private ParquetColumnChunkPageWriteStore pageStore;
 
   private RecordConsumer consumer;
   private BatchSchema batchSchema;
@@ -206,11 +206,21 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     }
     schema = new MessageType("root", types);
 
+    // We don't want this number to be too small, ideally we divide the block equally across the columns.
+    // It is unlikely all columns are going to be the same size.
+    // Its value is likely below Integer.MAX_VALUE (2GB), although rowGroupSize is a long type.
+    // Therefore this size is cast to int, since allocating byte array in under layer needs to
+    // limit the array size in an int scope.
     int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size() / 5);
-    pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext,
-        codecFactory.getCompressor(codec),
-        schema);
+    // We don't want this number to be too small either. Ideally, slightly bigger than the page size,
+    // but not bigger than the block buffer
     int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
+    // TODO: Use initialSlabSize from ParquetProperties once drill will be updated to the latest version of Parquet library
+    int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(64, pageSize, 10);
+    // TODO: Replace ParquetColumnChunkPageWriteStore with ColumnChunkPageWriteStore from parquet library
+    // once PARQUET-1006 will be resolved
+    pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, initialSlabSize,
+        pageSize, new ParquetDirectByteBufferAllocator(oContext));
     store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, enableDictionary,
         writerVersion, new ParquetDirectByteBufferAllocator(oContext));
     MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
@@ -263,26 +273,27 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   }
 
   private void flush() throws IOException {
-    if (recordCount > 0) {
-      parquetFileWriter.startBlock(recordCount);
-      consumer.flush();
-      store.flush();
-      ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
-      recordCount = 0;
-      parquetFileWriter.endBlock();
-
-      // we are writing one single block per file
-      parquetFileWriter.end(extraMetaData);
-      parquetFileWriter = null;
-    }
-
-    store.close();
-    // TODO(jaltekruse) - review this close method should no longer be necessary
-//    ColumnChunkPageWriteStoreExposer.close(pageStore);
+    try {
+      if (recordCount > 0) {
+        parquetFileWriter.startBlock(recordCount);
+        consumer.flush();
+        store.flush();
+        pageStore.flushToFileWriter(parquetFileWriter);
+        recordCount = 0;
+        parquetFileWriter.endBlock();
+
+        // we are writing one single block per file
+        parquetFileWriter.end(extraMetaData);
+        parquetFileWriter = null;
+      }
+    } finally {
+      store.close();
+      pageStore.close();
 
-    store = null;
-    pageStore = null;
-    index++;
+      store = null;
+      pageStore = null;
+      index++;
+    }
   }
 
   private void checkBlockSizeReached() throws IOException {

http://git-wip-us.apache.org/repos/asf/drill/blob/b714b2d7/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
deleted file mode 100644
index 564a0a4..0000000
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.parquet.hadoop;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
-
-import org.apache.parquet.column.page.PageWriteStore;
-import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
-import org.apache.parquet.schema.MessageType;
-
-public class ColumnChunkPageWriteStoreExposer {
-
-  public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(
-      OperatorContext oContext,
-      BytesCompressor compressor,
-      MessageType schema
-      ) {
-    return new ColumnChunkPageWriteStore(compressor, schema, new ParquetDirectByteBufferAllocator(oContext));
-  }
-
-  public static void flushPageStore(PageWriteStore pageStore, ParquetFileWriter w) throws IOException {
-    ((ColumnChunkPageWriteStore) pageStore).flushToFileWriter(w);
-  }
-
-  // TODO(jaltekruse) - review, this used to have a method for closing a pageStore
-  // the parquet code once rebased did not include this close method, make sure it isn't needed
-  // I might have messed up the merge
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/b714b2d7/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
new file mode 100644
index 0000000..2d8e27d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageWriteStore;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here.
+ * Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators.
+ * It will be no need in this class once PARQUET-1006 is resolved.
+ */
+public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeable {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
+
+  private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+
+  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = Maps.newHashMap();
+  private final MessageType schema;
+
+  public ParquetColumnChunkPageWriteStore(BytesCompressor compressor,
+                                          MessageType schema,
+                                          int initialSlabSize,
+                                          int maxCapacityHint,
+                                          ByteBufferAllocator allocator) {
+    this.schema = schema;
+    for (ColumnDescriptor path : schema.getColumns()) {
+      writers.put(path,  new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator));
+    }
+  }
+
+  @Override
+  public PageWriter getPageWriter(ColumnDescriptor path) {
+    return writers.get(path);
+  }
+
+  /**
+   * Writes the column chunks in the corresponding row group
+   * @param writer the parquet file writer
+   * @throws IOException if the file can not be created
+   */
+  public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
+    for (ColumnDescriptor path : schema.getColumns()) {
+      ColumnChunkPageWriter pageWriter = writers.get(path);
+      pageWriter.writeToFileWriter(writer);
+    }
+  }
+
+  @Override
+  public void close() {
+    for (ColumnChunkPageWriter pageWriter : writers.values()) {
+      pageWriter.close();
+    }
+  }
+
+  private static final class ColumnChunkPageWriter implements PageWriter, Closeable {
+
+    private final ColumnDescriptor path;
+    private final BytesCompressor compressor;
+
+    private final CapacityByteArrayOutputStream buf;
+    private DictionaryPage dictionaryPage;
+
+    private long uncompressedLength;
+    private long compressedLength;
+    private long totalValueCount;
+    private int pageCount;
+
+    // repetition and definition level encodings are used only for v1 pages and don't change
+    private Set<Encoding> rlEncodings = Sets.newHashSet();
+    private Set<Encoding> dlEncodings = Sets.newHashSet();
+    private List<Encoding> dataEncodings = Lists.newArrayList();
+
+    private Statistics totalStatistics;
+
+    private ColumnChunkPageWriter(ColumnDescriptor path,
+                                  BytesCompressor compressor,
+                                  int initialSlabSize,
+                                  int maxCapacityHint,
+                                  ByteBufferAllocator allocator) {
+      this.path = path;
+      this.compressor = compressor;
+      this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator);
+      this.totalStatistics = getStatsBasedOnType(this.path.getType());
+    }
+
+    @Override
+    public void writePage(BytesInput bytes,
+                          int valueCount,
+                          Statistics statistics,
+                          Encoding rlEncoding,
+                          Encoding dlEncoding,
+                          Encoding valuesEncoding) throws IOException {
+      long uncompressedSize = bytes.size();
+      // Parquet library creates bad metadata if the uncompressed or compressed size of a page exceeds Integer.MAX_VALUE
+      if (uncompressedSize > Integer.MAX_VALUE) {
+        throw new ParquetEncodingException(
+            "Cannot write page larger than Integer.MAX_VALUE bytes: " +
+                uncompressedSize);
+      }
+      BytesInput compressedBytes = compressor.compress(bytes);
+      long compressedSize = compressedBytes.size();
+      if (compressedSize > Integer.MAX_VALUE) {
+        throw new ParquetEncodingException(
+            "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
+                + compressedSize);
+      }
+      parquetMetadataConverter.writeDataPageHeader(
+          (int)uncompressedSize,
+          (int)compressedSize,
+          valueCount,
+          statistics,
+          rlEncoding,
+          dlEncoding,
+          valuesEncoding,
+          buf);
+      this.uncompressedLength += uncompressedSize;
+      this.compressedLength += compressedSize;
+      this.totalValueCount += valueCount;
+      this.pageCount += 1;
+      this.totalStatistics.mergeStatistics(statistics);
+      compressedBytes.writeAllTo(buf);
+      rlEncodings.add(rlEncoding);
+      dlEncodings.add(dlEncoding);
+      dataEncodings.add(valuesEncoding);
+    }
+
+    @Override
+    public void writePageV2(int rowCount,
+                            int nullCount,
+                            int valueCount,
+                            BytesInput repetitionLevels,
+                            BytesInput definitionLevels,
+                            Encoding dataEncoding,
+                            BytesInput data,
+                            Statistics<?> statistics) throws IOException {
+      int rlByteLength = toIntWithCheck(repetitionLevels.size());
+      int dlByteLength = toIntWithCheck(definitionLevels.size());
+      int uncompressedSize = toIntWithCheck(
+          data.size() + repetitionLevels.size() + definitionLevels.size()
+      );
+      BytesInput compressedData = compressor.compress(data);
+      int compressedSize = toIntWithCheck(
+          compressedData.size() + repetitionLevels.size() + definitionLevels.size()
+      );
+      parquetMetadataConverter.writeDataPageV2Header(
+          uncompressedSize, compressedSize,
+          valueCount, nullCount, rowCount,
+          statistics,
+          dataEncoding,
+          rlByteLength,
+          dlByteLength,
+          buf);
+      this.uncompressedLength += uncompressedSize;
+      this.compressedLength += compressedSize;
+      this.totalValueCount += valueCount;
+      this.pageCount += 1;
+      this.totalStatistics.mergeStatistics(statistics);
+
+      definitionLevels.writeAllTo(buf);
+      compressedData.writeAllTo(buf);
+
+      dataEncodings.add(dataEncoding);
+    }
+
+    private int toIntWithCheck(long size) {
+      if (size > Integer.MAX_VALUE) {
+        throw new ParquetEncodingException(
+            "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " +
+                size);
+      }
+      return (int)size;
+    }
+
+    @Override
+    public long getMemSize() {
+      return buf.size();
+    }
+
+    /**
+     * Writes a number of pages within corresponding column chunk
+     * @param writer the parquet file writer
+     * @throws IOException if the file can not be created
+     */
+    public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
+      writer.startColumn(path, totalValueCount, compressor.getCodecName());
+      if (dictionaryPage != null) {
+        writer.writeDictionaryPage(dictionaryPage);
+        // tracking the dictionary encoding is handled in writeDictionaryPage
+      }
+      List<Encoding> encodings = Lists.newArrayList();
+      encodings.addAll(rlEncodings);
+      encodings.addAll(dlEncodings);
+      encodings.addAll(dataEncodings);
+      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, encodings);
+      writer.endColumn();
+      logger.debug(
+          String.format(
+              "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
+              buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, Sets.newHashSet(dataEncodings))
+              + (dictionaryPage != null ? String.format(
+              ", dic { %,d entries, %,dB raw, %,dB comp}",
+              dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
+              : ""));
+      rlEncodings.clear();
+      dlEncodings.clear();
+      dataEncodings.clear();
+      pageCount = 0;
+    }
+
+    @Override
+    public long allocatedSize() {
+      return buf.getCapacity();
+    }
+
+    @Override
+    public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
+      if (this.dictionaryPage != null) {
+        throw new ParquetEncodingException("Only one dictionary page is allowed");
+      }
+      BytesInput dictionaryBytes = dictionaryPage.getBytes();
+      int uncompressedSize = (int)dictionaryBytes.size();
+      BytesInput compressedBytes = compressor.compress(dictionaryBytes);
+      this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize,
+          dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
+    }
+
+    @Override
+    public String memUsageString(String prefix) {
+      return buf.memUsageString(prefix + " ColumnChunkPageWriter");
+    }
+
+    @Override
+    public void close() {
+      buf.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/b714b2d7/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index e5c6ce4..3c174ae 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -760,7 +760,7 @@ public class TestParquetWriter extends BaseTestQuery {
   }
 
   /*
-  Test the reading of a binary field as drill varbinary where data is in dicationary _and_ non-dictionary encoded pages
+  Test the reading of a binary field as drill varbinary where data is in dictionary _and_ non-dictionary encoded pages
    */
   @Test
   public void testImpalaParquetBinaryAsVarBinary_DictChange() throws Exception {
@@ -768,7 +768,7 @@ public class TestParquetWriter extends BaseTestQuery {
   }
 
   /*
-  Test the reading of a binary field as drill timestamp where data is in dicationary _and_ non-dictionary encoded pages
+  Test the reading of a binary field as drill timestamp where data is in dictionary _and_ non-dictionary encoded pages
    */
   @Test
   @Ignore("relies on particular time zone, works for UTC")


[3/3] drill git commit: DRILL-5514: Enhance VectorContainer to merge two row sets

Posted by ar...@apache.org.
DRILL-5514: Enhance VectorContainer to merge two row sets

Adds ability to merge two schemas and to merge two vector containers,
in each case producing a new, merged result. See DRILL-5514 for details.

Also provides a handy constructor to create a vector container given a
pre-defined schema.

closes #837


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/be43a9ed
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/be43a9ed
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/be43a9ed

Branch: refs/heads/master
Commit: be43a9edd148ef3af6f92c5ce7cda235c5ac1ad6
Parents: b714b2d
Author: Paul Rogers <pr...@maprtech.com>
Authored: Mon May 15 15:59:35 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Mon Jun 19 12:22:10 2017 +0300

----------------------------------------------------------------------
 .../apache/drill/exec/record/BatchSchema.java   |  28 +++++
 .../drill/exec/record/VectorContainer.java      |  54 +++++++-
 .../drill/exec/record/TestVectorContainer.java  | 126 +++++++++++++++++++
 .../apache/drill/test/rowSet/DirectRowSet.java  |   5 +
 .../drill/test/rowSet/HyperRowSetImpl.java      |   5 +
 .../drill/test/rowSet/IndirectRowSet.java       |   5 +
 .../org/apache/drill/test/rowSet/RowSet.java    |   2 +
 7 files changed, 221 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index e9dcd28..63dcdb45 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.record;
 
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -157,4 +158,31 @@ public class BatchSchema implements Iterable<MaterializedField> {
     return true;
   }
 
+  /**
+   * Merge two schema to produce a new, merged schema. The caller is responsible
+   * for ensuring that column names are unique. The order of the fields in the
+   * new schema is the same as that of this schema, with the other schema's fields
+   * appended in the order defined in the other schema.
+   * <p>
+   * Merging data with selection vectors is unlikely to be useful, or work well.
+   * With a selection vector, the two record batches would have to be correlated
+   * both in their selection vectors AND in the underlying vectors. Such a use case
+   * is hard to imagine. So, for now, this method forbids merging schemas if either
+   * of them carry a selection vector. If we discover a meaningful use case, we can
+   * revisit the issue.
+   * @param otherSchema the schema to merge with this one
+   * @return the new, merged, schema
+   */
+
+  public BatchSchema merge(BatchSchema otherSchema) {
+    if (selectionVectorMode != SelectionVectorMode.NONE ||
+        otherSchema.selectionVectorMode != SelectionVectorMode.NONE) {
+      throw new IllegalArgumentException("Cannot merge schemas with selection vectors");
+    }
+    List<MaterializedField> mergedFields =
+        new ArrayList<>(fields.size() + otherSchema.fields.size());
+    mergedFields.addAll(this.fields);
+    mergedFields.addAll(otherSchema.fields);
+    return new BatchSchema(selectionVectorMode, mergedFields);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 69e04ac..54a04bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -60,6 +60,28 @@ public class VectorContainer implements VectorAccessible {
     this.allocator = allocator;
   }
 
+  /**
+   * Create a new vector container given a pre-defined schema. Creates the
+   * corresponding vectors, but does not allocate memory for them. Call
+   * {@link #allocateNew()} or {@link #allocateNewSafe()} to allocate
+   * memory.
+   * <p>
+   * Note that this method does the equivalent of {@link #buildSchema(SelectionVectorMode)}
+   * using the schema provided.
+   *
+   * @param allocator allocator to be used to allocate memory later
+   * @param schema the schema that defines the vectors to create
+   */
+
+  public VectorContainer(BufferAllocator allocator, BatchSchema schema) {
+    this.allocator = allocator;
+    for (MaterializedField field : schema) {
+      addOrGet(field, null);
+    }
+    this.schema = schema;
+    schemaChanged = false;
+  }
+
   @Override
   public String toString() {
     return super.toString()
@@ -304,7 +326,6 @@ public class VectorContainer implements VectorAccessible {
     }
 
     return va.getChildWrapper(fieldIds);
-
   }
 
   private VectorWrapper<?> getValueAccessorById(int... fieldIds) {
@@ -375,9 +396,7 @@ public class VectorContainer implements VectorAccessible {
    * Clears the contained vectors.  (See {@link ValueVector#clear}).
    */
   public void zeroVectors() {
-    for (VectorWrapper<?> w : wrappers) {
-      w.clear();
-    }
+    VectorAccessibleUtilities.clear(this);
   }
 
   public int getNumberOfColumns() {
@@ -398,4 +417,31 @@ public class VectorContainer implements VectorAccessible {
     }
     return true;
   }
+
+  /**
+   * Merge two batches to create a single, combined, batch. Vectors
+   * appear in the order defined by {@link BatchSchema#merge(BatchSchema)}.
+   * The two batches must have identical row counts. The pattern is that
+   * this container is the main part of the record batch, the other
+   * represents new columns to merge.
+   * <p>
+   * Reference counts on the underlying buffers are <b>unchanged</b>.
+   * The client code is assumed to abandon the two input containers in
+   * favor of the merged container.
+   *
+   * @param otherContainer the container to merge with this one
+   * @return a new, merged, container
+   */
+  public VectorContainer merge(VectorContainer otherContainer) {
+    if (recordCount != otherContainer.recordCount) {
+      throw new IllegalArgumentException();
+    }
+    VectorContainer merged = new VectorContainer(allocator);
+    merged.schema = schema.merge(otherContainer.schema);
+    merged.recordCount = recordCount;
+    merged.wrappers.addAll(wrappers);
+    merged.wrappers.addAll(otherContainer.wrappers);
+    merged.schemaChanged = false;
+    return merged;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
new file mode 100644
index 0000000..d7a59bf
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import static org.junit.Assert.*;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.test.DrillTest;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestVectorContainer extends DrillTest {
+
+  // TODO: Replace the following with an extension of SubOperatorTest class
+  // once that is available.
+
+  protected static OperatorFixture fixture;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    fixture = OperatorFixture.standardFixture();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    fixture.close();
+  }
+
+  /**
+   * Test of the ability to merge two schemas and to merge
+   * two vector containers. The merge is "horizontal", like
+   * a row-by-row join. Since each container is a list of
+   * vectors, we just combine the two lists to create the
+   * merged result.
+   */
+  @Test
+  public void testContainerMerge() {
+
+    // Simulated data from a reader
+
+    BatchSchema leftSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addNullable("b", MinorType.VARCHAR)
+        .build();
+    SingleRowSet left = fixture.rowSetBuilder(leftSchema)
+        .add(10, "fred")
+        .add(20, "barney")
+        .add(30, "wilma")
+        .build();
+
+    // Simulated "implicit" coumns: row number and file name
+
+    BatchSchema rightSchema = new SchemaBuilder()
+        .add("x", MinorType.SMALLINT)
+        .add("y", MinorType.VARCHAR)
+        .build();
+    SingleRowSet right = fixture.rowSetBuilder(rightSchema)
+        .add(1, "foo.txt")
+        .add(2, "bar.txt")
+        .add(3, "dino.txt")
+        .build();
+
+    // The merge batch we expect to see
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addNullable("b", MinorType.VARCHAR)
+        .add("x", MinorType.SMALLINT)
+        .add("y", MinorType.VARCHAR)
+        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .add(10, "fred", 1, "foo.txt")
+        .add(20, "barney", 2, "bar.txt")
+        .add(30, "wilma", 3, "dino.txt")
+        .build();
+
+    // Merge containers without selection vector
+
+    RowSet merged = fixture.wrap(
+        left.container().merge(right.container()));
+
+    RowSetComparison comparison = new RowSetComparison(expected);
+    comparison.verify(merged);
+
+    // Merge containers via row set facade
+
+    RowSet mergedRs = left.merge(right);
+    comparison.verifyAndClear(mergedRs);
+
+    // Add a selection vector. Merging is forbidden, in the present code,
+    // for batches that have a selection vector.
+
+    SingleRowSet leftIndirect = left.toIndirect();
+    try {
+      leftIndirect.merge(right);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+    leftIndirect.clear();
+    right.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
index 706db27..29a1702 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
@@ -233,4 +233,9 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
 
   @Override
   public SelectionVector2 getSv2() { return null; }
+
+  @Override
+  public RowSet merge(RowSet other) {
+    return new DirectRowSet(allocator, container().merge(other.container()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
index c7cb1b2..afc2e6e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
@@ -292,4 +292,9 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
 
   @Override
   public int rowCount() { return sv4.getCount(); }
+
+  @Override
+  public RowSet merge(RowSet other) {
+    return new HyperRowSetImpl(allocator, container().merge(other.container()), sv4);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
index f90fbb7..17a0ac8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
@@ -122,4 +122,9 @@ public class IndirectRowSet extends AbstractSingleRowSet {
     RecordBatchSizer sizer = new RecordBatchSizer(container, sv2);
     return sizer.actualSize();
   }
+
+  @Override
+  public RowSet merge(RowSet other) {
+    return new IndirectRowSet(allocator, container().merge(other.container()), sv2);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
index d22139c..b6bbd4f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
@@ -162,6 +162,8 @@ public interface RowSet {
 
   int size();
 
+  RowSet merge(RowSet other);
+
   BatchSchema batchSchema();
 
   /**