You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/11/23 23:42:09 UTC

[1/5] drill git commit: DRILL-4935: Allow drillbits to advertise a configurable host address to Zookeeper

Repository: drill
Updated Branches:
  refs/heads/master 35275536c -> 04fb0be19


DRILL-4935: Allow drillbits to advertise a configurable host address to
Zookeeper

This closes #647


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1c231735
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1c231735
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1c231735

Branch: refs/heads/master
Commit: 1c23173549ee53481316c85a65028f6ff9fb1b93
Parents: 3527553
Author: Harrison Mebane <ha...@svds.com>
Authored: Fri Oct 7 13:10:36 2016 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Mon Nov 21 17:32:09 2016 -0800

----------------------------------------------------------------------
 distribution/src/resources/drill-env.sh                  |  4 ++++
 .../org/apache/drill/exec/service/ServiceEngine.java     | 11 ++++++++++-
 2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/1c231735/distribution/src/resources/drill-env.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-env.sh b/distribution/src/resources/drill-env.sh
index 89a0c1e..cd11b47 100644
--- a/distribution/src/resources/drill-env.sh
+++ b/distribution/src/resources/drill-env.sh
@@ -62,6 +62,10 @@
 
 #export DRILLBIT_CODE_CACHE_SIZE=${DRILLBIT_CODE_CACHE_SIZE:-"1G"}
 
+# Provide a customized host name for when the default mechanism is not accurate
+
+#export DRILL_HOST_NAME=`hostname`
+
 # Base name for Drill log files. Files are named ${DRILL_LOG_NAME}.out, etc.
 
 # DRILL_LOG_NAME="drillbit"

http://git-wip-us.apache.org/repos/asf/drill/blob/1c231735/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index d505546..9b7b6c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -140,9 +140,18 @@ public class ServiceEngine implements AutoCloseable {
         name, context.getConfig().getLong(initReservation), context.getConfig().getLong(maxAllocation));
   }
 
+  private String getHostName() throws UnknownHostException{
+    // DRILL_HOST_NAME sets custom host name.  See drill-env.sh for details.
+    String customHost = System.getenv("DRILL_HOST_NAME");
+    if (customHost != null) {
+      return customHost;
+    }
+    return useIP ? InetAddress.getLocalHost().getHostAddress() : InetAddress.getLocalHost().getCanonicalHostName();
+  }
+
   public DrillbitEndpoint start() throws DrillbitStartupException, UnknownHostException{
     int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT), allowPortHunting);
-    String address = useIP ?  InetAddress.getLocalHost().getHostAddress() : InetAddress.getLocalHost().getCanonicalHostName();
+    String address = getHostName();
     checkLoopbackAddress(address);
 
     DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder()


[2/5] drill git commit: DRILL-4979: Make port of the DataConnection configurable

Posted by pa...@apache.org.
DRILL-4979: Make port of the DataConnection configurable

This closes #649


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2de3a27c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2de3a27c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2de3a27c

Branch: refs/heads/master
Commit: 2de3a27c3440429d9bf852b6595542ce2bd0fd1b
Parents: 1c23173
Author: Korn, Uwe <Uw...@blue-yonder.com>
Authored: Wed Nov 9 13:02:20 2016 +0100
Committer: Parth Chandra <pa...@apache.org>
Committed: Mon Nov 21 17:38:33 2016 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/drill/exec/ExecConstants.java    | 1 +
 .../org/apache/drill/exec/rpc/data/DataConnectionCreator.java | 7 ++++++-
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/2de3a27c/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 5f62781..bb451c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -44,6 +44,7 @@ public interface ExecConstants {
   String BIT_TIMEOUT = "drill.exec.bit.timeout" ;
   String SERVICE_NAME = "drill.exec.cluster-id";
   String INITIAL_BIT_PORT = "drill.exec.rpc.bit.server.port";
+  String INITIAL_DATA_PORT = "drill.exec.rpc.bit.server.dataport";
   String BIT_RPC_TIMEOUT = "drill.exec.rpc.bit.timeout";
   String INITIAL_USER_PORT = "drill.exec.rpc.user.server.port";
   String USER_RPC_TIMEOUT = "drill.exec.rpc.user.timeout";

http://git-wip-us.apache.org/repos/asf/drill/blob/2de3a27c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
index a90d356..5c71b91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.rpc.data;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -59,7 +60,11 @@ public class DataConnectionCreator implements AutoCloseable {
 
   public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException {
     server = new DataServer(context, dataAllocator, workBus, bee);
-    int port = server.bind(partialEndpoint.getControlPort() + 1, allowPortHunting);
+    int port = partialEndpoint.getControlPort() + 1;
+    if (context.getConfig().hasPath(ExecConstants.INITIAL_DATA_PORT)) {
+      port = context.getConfig().getInt(ExecConstants.INITIAL_DATA_PORT);
+    }
+    port = server.bind(port, allowPortHunting);
     DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setDataPort(port).build();
     return completeEndpoint;
   }


[3/5] drill git commit: DRILL-4980: Upgrading of the approach of parquet date correctness status detection - Parquet writer version is added; - Updated the detection method of parquet date correctness.

Posted by pa...@apache.org.
DRILL-4980: Upgrading of the approach of parquet date correctness status detection - Parquet writer version is added; - Updated the detection method of parquet date correctness.

This closes #644


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/cf29b917
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/cf29b917
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/cf29b917

Branch: refs/heads/master
Commit: cf29b917af4002a1ba0ef750ff3f5a553e910f8e
Parents: 2de3a27
Author: Vitalii Diravka <vi...@gmail.com>
Authored: Wed Oct 26 12:22:06 2016 +0000
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed Nov 23 09:33:29 2016 -0800

----------------------------------------------------------------------
 .../drill/exec/store/parquet/Metadata.java      |  61 ++++++++-----------
 .../store/parquet/ParquetReaderUtility.java     |  48 ++++++++++-----
 .../exec/store/parquet/ParquetRecordWriter.java |   4 +-
 .../drill/exec/store/parquet/ParquetWriter.java |  12 ++++
 .../TestCorruptParquetDateCorrection.java       |   6 +-
 .../0_0_1.parquet                               | Bin 290 -> 292 bytes
 .../0_0_2.parquet                               | Bin 290 -> 292 bytes
 .../0_0_3.parquet                               | Bin 290 -> 292 bytes
 .../0_0_4.parquet                               | Bin 290 -> 292 bytes
 .../0_0_5.parquet                               | Bin 290 -> 292 bytes
 .../0_0_6.parquet                               | Bin 290 -> 292 bytes
 .../4203_corrected_dates.parquet                | Bin 311 -> 313 bytes
 ...on_partitioned_metadata.requires_replace.txt |   2 +-
 13 files changed, 76 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 04a2476..7b3afb4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -189,9 +189,9 @@ public class Metadata {
         childFiles.add(file);
       }
     }
-    ParquetTableMetadata_v3 parquetTableMetadata = new ParquetTableMetadata_v3(true);
+    ParquetTableMetadata_v3 parquetTableMetadata = new ParquetTableMetadata_v3(DrillVersionInfo.getVersion());
     if (childFiles.size() > 0) {
-      List<ParquetFileMetadata_v3> childFilesMetadata =
+      List<ParquetFileMetadata_v3 > childFilesMetadata =
           getParquetFileMetadata_v3(parquetTableMetadata, childFiles);
       metaDataList.addAll(childFilesMetadata);
       // Note that we do not need to merge the columnInfo at this point. The columnInfo is already added
@@ -674,9 +674,6 @@ public class Metadata {
     @JsonIgnore public abstract ParquetTableMetadataBase clone();
 
     @JsonIgnore public abstract String getDrillVersion();
-
-    @JsonIgnore public abstract boolean isDateCorrect();
-
   }
 
   public static abstract class ParquetFileMetadata {
@@ -811,12 +808,6 @@ public class Metadata {
     public String getDrillVersion() {
       return null;
     }
-
-    @JsonIgnore @Override
-    public boolean isDateCorrect() {
-      return false;
-    }
-
   }
 
 
@@ -1025,31 +1016,29 @@ public class Metadata {
     @JsonProperty List<ParquetFileMetadata_v2> files;
     @JsonProperty List<String> directories;
     @JsonProperty String drillVersion;
-    @JsonProperty boolean isDateCorrect;
 
     public ParquetTableMetadata_v2() {
       super();
     }
 
-    public ParquetTableMetadata_v2(boolean isDateCorrect) {
-      this.drillVersion = DrillVersionInfo.getVersion();
-      this.isDateCorrect = isDateCorrect;
+    public ParquetTableMetadata_v2(String drillVersion) {
+      this.drillVersion = drillVersion;
     }
 
     public ParquetTableMetadata_v2(ParquetTableMetadataBase parquetTable,
-        List<ParquetFileMetadata_v2> files, List<String> directories) {
+        List<ParquetFileMetadata_v2> files, List<String> directories, String drillVersion) {
       this.files = files;
       this.directories = directories;
       this.columnTypeInfo = ((ParquetTableMetadata_v2) parquetTable).columnTypeInfo;
-      this.drillVersion = DrillVersionInfo.getVersion();
-      this.isDateCorrect = true;
+      this.drillVersion = drillVersion;
     }
 
     public ParquetTableMetadata_v2(List<ParquetFileMetadata_v2> files, List<String> directories,
-        ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo) {
+        ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo, String drillVersion) {
       this.files = files;
       this.directories = directories;
       this.columnTypeInfo = columnTypeInfo;
+      this.drillVersion = drillVersion;
     }
 
     public ColumnTypeMetadata_v2 getColumnTypeInfo(String[] name) {
@@ -1096,7 +1085,7 @@ public class Metadata {
     }
 
     @JsonIgnore @Override public ParquetTableMetadataBase clone() {
-      return new ParquetTableMetadata_v2(files, directories, columnTypeInfo);
+      return new ParquetTableMetadata_v2(files, directories, columnTypeInfo, drillVersion);
     }
 
     @JsonIgnore @Override
@@ -1104,10 +1093,6 @@ public class Metadata {
       return drillVersion;
     }
 
-    @JsonIgnore @Override public boolean isDateCorrect() {
-      return isDateCorrect;
-    }
-
   }
 
 
@@ -1383,30 +1368,38 @@ public class Metadata {
     @JsonProperty List<ParquetFileMetadata_v3> files;
     @JsonProperty List<String> directories;
     @JsonProperty String drillVersion;
-    @JsonProperty boolean isDateCorrect;
 
+    /**
+     * Default constructor needed for deserialization from Parquet Metadata Cache Files
+     * or for creating an empty instances of this class for the case when the Metadata Cache File is absent
+     */
     public ParquetTableMetadata_v3() {
       super();
     }
 
-    public ParquetTableMetadata_v3(boolean isDateCorrect) {
-      this.drillVersion = DrillVersionInfo.getVersion();
-      this.isDateCorrect = isDateCorrect;
+    /**
+     * Used for creating the Parquet Metadata Cache File
+     * @param drillVersion  actual version of apache drill
+     */
+    public ParquetTableMetadata_v3(String drillVersion) {
+      this.drillVersion = drillVersion;
     }
 
     public ParquetTableMetadata_v3(ParquetTableMetadataBase parquetTable,
-        List<ParquetFileMetadata_v3> files, List<String> directories) {
+        List<ParquetFileMetadata_v3> files, List<String> directories, String drillVersion) {
       this.files = files;
       this.directories = directories;
       this.columnTypeInfo = ((ParquetTableMetadata_v3) parquetTable).columnTypeInfo;
-      this.isDateCorrect = true;
+      this.drillVersion = drillVersion;
     }
 
     public ParquetTableMetadata_v3(List<ParquetFileMetadata_v3> files, List<String> directories,
-        ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfo) {
+        ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfo,
+        String drillVersion) {
       this.files = files;
       this.directories = directories;
       this.columnTypeInfo = columnTypeInfo;
+      this.drillVersion = drillVersion;
     }
 
     public ColumnTypeMetadata_v3 getColumnTypeInfo(String[] name) {
@@ -1453,7 +1446,7 @@ public class Metadata {
     }
 
     @JsonIgnore @Override public ParquetTableMetadataBase clone() {
-      return new ParquetTableMetadata_v3(files, directories, columnTypeInfo);
+      return new ParquetTableMetadata_v3(files, directories, columnTypeInfo, drillVersion);
     }
 
     @JsonIgnore @Override
@@ -1461,10 +1454,6 @@ public class Metadata {
       return drillVersion;
     }
 
-    @JsonIgnore @Override public boolean isDateCorrect() {
-      return isDateCorrect;
-    }
-
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 767c98d..b22e666 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -59,19 +59,26 @@ public class ParquetReaderUtility {
    */
   public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
   /**
-   * All old parquet files (which haven't "is.date.correct=true" property in metadata) have
-   * a corrupt date shift: {@value} days or 2 * {@value #JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH}
+   * All old parquet files (which haven't "is.date.correct=true" or "parquet-writer.version" properties
+   * in metadata) have a corrupt date shift: {@value} days or 2 * {@value #JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH}
    */
   public static final long CORRECT_CORRUPT_DATE_SHIFT = 2 * JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH;
-  // The year 5000 (or 1106685 day from Unix epoch) is chosen as the threshold for auto-detecting date corruption.
-  // This balances two possible cases of bad auto-correction. External tools writing dates in the future will not
-  // be shifted unless they are past this threshold (and we cannot identify them as external files based on the metadata).
-  // On the other hand, historical dates written with Drill wouldn't risk being incorrectly shifted unless they were
-  // something like 10,000 years in the past.
   private static final Chronology UTC = org.joda.time.chrono.ISOChronology.getInstanceUTC();
+  /**
+   * The year 5000 (or 1106685 day from Unix epoch) is chosen as the threshold for auto-detecting date corruption.
+   * This balances two possible cases of bad auto-correction. External tools writing dates in the future will not
+   * be shifted unless they are past this threshold (and we cannot identify them as external files based on the metadata).
+   * On the other hand, historical dates written with Drill wouldn't risk being incorrectly shifted unless they were
+   * something like 10,000 years in the past.
+   */
   public static final int DATE_CORRUPTION_THRESHOLD =
       (int) (UTC.getDateTimeMillis(5000, 1, 1, 0) / DateTimeConstants.MILLIS_PER_DAY);
-
+  /**
+   * Version 2 (and later) of the Drill Parquet writer uses the date format described in the
+   * <a href="https://github.com/Parquet/parquet-format/blob/master/LogicalTypes.md#date">Parquet spec</a>.
+   * Prior versions had dates formatted with {@link org.apache.drill.exec.store.parquet.ParquetReaderUtility#CORRECT_CORRUPT_DATE_SHIFT}
+   */
+  public static final int DRILL_WRITER_VERSION_STD_DATE_FORMAT = 2;
   /**
    * For most recently created parquet files, we can determine if we have corrupted dates (see DRILL-4203)
    * based on the file metadata. For older files that lack statistics we must actually test the values
@@ -130,10 +137,9 @@ public class ParquetReaderUtility {
   }
 
   public static void correctDatesInMetadataCache(Metadata.ParquetTableMetadataBase parquetTableMetadata) {
-    boolean isDateCorrect = parquetTableMetadata.isDateCorrect();
-    DateCorruptionStatus cacheFileContainsCorruptDates = isDateCorrect ?
-        DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : DateCorruptionStatus.META_SHOWS_CORRUPTION;
-    if (cacheFileContainsCorruptDates == DateCorruptionStatus.META_SHOWS_CORRUPTION) {
+    DateCorruptionStatus cacheFileCanContainsCorruptDates = parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v3 ?
+        DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : DateCorruptionStatus.META_UNCLEAR_TEST_VALUES;
+    if (cacheFileCanContainsCorruptDates == DateCorruptionStatus.META_UNCLEAR_TEST_VALUES) {
       // Looking for the DATE data type of column names in the metadata cache file ("metadata_version" : "v2")
       String[] names = new String[0];
       if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v2) {
@@ -189,9 +195,20 @@ public class ParquetReaderUtility {
 
     String createdBy = footer.getFileMetaData().getCreatedBy();
     String drillVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.DRILL_VERSION_PROPERTY);
-    String isDateCorrect = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.IS_DATE_CORRECT_PROPERTY);
+    String stringWriterVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.WRITER_VERSION_PROPERTY);
+    // This flag can be present in parquet files which were generated with 1.9.0-SNAPSHOT and 1.9.0 drill versions.
+    // If this flag is present it means that the version of the drill parquet writer is 2
+    final String isDateCorrectFlag = "is.date.correct";
+    String isDateCorrect = footer.getFileMetaData().getKeyValueMetaData().get(isDateCorrectFlag);
     if (drillVersion != null) {
-      return Boolean.valueOf(isDateCorrect) ? DateCorruptionStatus.META_SHOWS_NO_CORRUPTION
+      int writerVersion = 1;
+      if (stringWriterVersion != null) {
+        writerVersion = Integer.parseInt(stringWriterVersion);
+      }
+      else if (Boolean.valueOf(isDateCorrect)) {
+        writerVersion = DRILL_WRITER_VERSION_STD_DATE_FORMAT;
+      }
+      return writerVersion >= DRILL_WRITER_VERSION_STD_DATE_FORMAT ? DateCorruptionStatus.META_SHOWS_NO_CORRUPTION
           : DateCorruptionStatus.META_SHOWS_CORRUPTION;
     } else {
       // Possibly an old, un-migrated Drill file, check the column statistics to see if min/max values look corrupt
@@ -261,7 +278,8 @@ public class ParquetReaderUtility {
         // this reader only supports flat data, this is restricted in the ParquetScanBatchCreator
         // creating a NameSegment makes sure we are using the standard code for comparing names,
         // currently it is all case-insensitive
-        if (AbstractRecordReader.isStarQuery(columns) || new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) {
+        if (AbstractRecordReader.isStarQuery(columns)
+            || new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) {
           int colIndex = -1;
           ConvertedType convertedType = schemaElements.get(column.getPath()[0]).getConverted_type();
           if (convertedType != null && convertedType.equals(ConvertedType.DATE)) {

http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index bc6bb65..4ee863a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -78,7 +78,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
 
   public static final String DRILL_VERSION_PROPERTY = "drill.version";
-  public static final String IS_DATE_CORRECT_PROPERTY = "is.date.correct";
+  public static final String WRITER_VERSION_PROPERTY = "drill-writer.version";
 
   private ParquetFileWriter parquetFileWriter;
   private MessageType schema;
@@ -116,7 +116,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     this.partitionColumns = writer.getPartitionColumns();
     this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0;
     this.extraMetaData.put(DRILL_VERSION_PROPERTY, DrillVersionInfo.getVersion());
-    this.extraMetaData.put(IS_DATE_CORRECT_PROPERTY, "true");
+    this.extraMetaData.put(WRITER_VERSION_PROPERTY, String.valueOf(ParquetWriter.WRITER_VERSION));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 49c231e..716c56d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -40,6 +40,18 @@ import com.google.common.base.Preconditions;
 public class ParquetWriter extends AbstractWriter {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetWriter.class);
 
+/** Version of Drill's Parquet writer. Increment this version (by 1) any time we make any format change to the file.
+ * Format changes include:
+ * <ul>
+ * <li>Supporting new data types,
+ * <li>Changes to the format of data fields,
+ * <li>Adding new metadata to the file footer, etc.
+ * </ul>
+ * Newer readers must be able to read old files. The Writer version tells the Parquet reader how to interpret fields
+ * or metadata when that data changes format from one writer version to another.
+ */
+  public static final int WRITER_VERSION = 2;
+
   private final String location;
   private final List<String> partitionColumns;
   private final ParquetFormatPlugin formatPlugin;

http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
index 27cc093..0ab247d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
@@ -67,8 +67,8 @@ import java.util.regex.Pattern;
 public class TestCorruptParquetDateCorrection extends PlanTestBase {
 
   // 4 files are in the directory:
-  //    - one created with the fixed version of the reader
-  //        - files have extra meta field: is.date.correct = true
+  //    - one created with the parquet-writer version number of "2"
+  //        - files have extra meta field: parquet-writer.version = 2
   //    - one from and old version of Drill, before we put in proper created by in metadata
   //        - this is read properly by looking at a Max value in the file statistics, to see that
   //          it is way off of a typical date value
@@ -123,7 +123,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
 
   /**
    * Test reading a directory full of partitioned parquet files with dates, these files have a drill version
-   * number of 1.9.0-SNAPSHOT and is.date.correct = true label in their footers, so we can be certain
+   * number of "1.9.0-SNAPSHOT" and parquet-writer version number of "2" in their footers, so we can be certain
    * they do not have corruption. The option to disable the correction is passed, but it will not change the result
    * in the case where we are certain correction is NOT needed. For more info see DRILL-4203.
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet
index 0ec76c8..34224c1 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet
index b43ab9d..8ce72e1 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet
index 95d46bb..b96fa61 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet
index 9f7269e..3c5a13a 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet
index b0d370a..3a6db56 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet
index a5c7b86..6e053ee 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet
index 3d46f56..7a461d0 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
index bb6e282..54f18a8 100644
--- a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
+++ b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
@@ -297,5 +297,5 @@
     } ]
   } ],
   "directories" : [ "file:REPLACED_IN_TEST/mixed_partitioned/1_9_0_partitioned_no_corruption", "file:REPLACED_IN_TEST/mixed_partitioned/partitioned_with_corruption_4203" ],
-  "isDateCorrect" : "true"
+  "drillVersion" : "1.9.0-SNAPSHOT"
 }


[5/5] drill git commit: DRILL-4995: Allow lazy init when dynamic UDF support is disabled

Posted by pa...@apache.org.
DRILL-4995: Allow lazy init when dynamic UDF support is disabled

This closes #645


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/04fb0be1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/04fb0be1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/04fb0be1

Branch: refs/heads/master
Commit: 04fb0be191ef09409c00ca7173cb903dfbe2abb0
Parents: 3f38118
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Thu Nov 3 16:20:04 2016 +0000
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed Nov 23 09:33:30 2016 -0800

----------------------------------------------------------------------
 .../expr/fn/FunctionImplementationRegistry.java | 79 ++++++++++----------
 .../drill/exec/planner/sql/DrillSqlWorker.java  | 15 ++--
 .../org/apache/drill/TestDynamicUDFSupport.java | 25 +++++++
 .../record/ExpressionTreeMaterializerTest.java  | 10 +++
 4 files changed, 79 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/04fb0be1/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index 988a9f6..d0aa33f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -140,9 +140,9 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
   }
 
   /**
-   * Using the given <code>functionResolver</code> find Drill function implementation for given
-   * <code>functionCall</code>
-   * If function implementation was not found and in case if Dynamic UDF Support is enabled
+   * Using the given <code>functionResolver</code>
+   * finds Drill function implementation for given <code>functionCall</code>.
+   * If function implementation was not found,
    * loads all missing remote functions and tries to find Drill implementation one more time.
    */
   @Override
@@ -154,12 +154,8 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
     AtomicLong version = new AtomicLong();
     DrillFuncHolder holder = functionResolver.getBestMatch(
         localFunctionRegistry.getMethods(functionReplacement(functionCall), version), functionCall);
-    if (holder == null && retry) {
-      if (optionManager != null && optionManager.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
-        if (loadRemoteFunctions(version.get())) {
-          return findDrillFunction(functionResolver, functionCall, false);
-        }
-      }
+    if (holder == null && retry && loadRemoteFunctions(version.get())) {
+      return findDrillFunction(functionResolver, functionCall, false);
     }
     return holder;
   }
@@ -183,7 +179,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
 
   /**
    * Find the Drill function implementation that matches the name, arg types and return type.
-   * If exact function implementation was not found and in case if Dynamic UDF Support is enabled
+   * If exact function implementation was not found,
    * loads all missing remote functions and tries to find Drill implementation one more time.
    */
   public DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType) {
@@ -197,11 +193,8 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
         return h;
       }
     }
-
-    if (retry && optionManager != null && optionManager.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
-      if (loadRemoteFunctions(version.get())) {
-        return findExactMatchingDrillFunction(name, argTypes, returnType, false);
-      }
+    if (retry && loadRemoteFunctions(version.get())) {
+      return findExactMatchingDrillFunction(name, argTypes, returnType, false);
     }
     return null;
   }
@@ -287,35 +280,39 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
     if (!missingJars.isEmpty()) {
       synchronized (this) {
         missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
-        List<JarScan> jars = Lists.newArrayList();
-        for (String jarName : missingJars) {
-          Path binary = null;
-          Path source = null;
-          URLClassLoader classLoader = null;
-          try {
-            binary = copyJarToLocal(jarName, remoteFunctionRegistry);
-            source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry);
-            URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
-            classLoader = new URLClassLoader(urls);
-            ScanResult scanResult = scan(classLoader, binary, urls);
-            localFunctionRegistry.validate(jarName, scanResult);
-            jars.add(new JarScan(jarName, scanResult, classLoader));
-          } catch (Exception e) {
-            deleteQuietlyLocalJar(binary);
-            deleteQuietlyLocalJar(source);
-            if (classLoader != null) {
-              try {
-                classLoader.close();
-              } catch (Exception ex) {
-                logger.warn("Problem during closing class loader for {}", jarName, e);
+        if (!missingJars.isEmpty()) {
+          logger.info("Starting dynamic UDFs lazy-init process.\n" +
+              "The following jars are going to be downloaded and registered locally: " + missingJars);
+          List<JarScan> jars = Lists.newArrayList();
+          for (String jarName : missingJars) {
+            Path binary = null;
+            Path source = null;
+            URLClassLoader classLoader = null;
+            try {
+              binary = copyJarToLocal(jarName, remoteFunctionRegistry);
+              source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry);
+              URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
+              classLoader = new URLClassLoader(urls);
+              ScanResult scanResult = scan(classLoader, binary, urls);
+              localFunctionRegistry.validate(jarName, scanResult);
+              jars.add(new JarScan(jarName, scanResult, classLoader));
+            } catch (Exception e) {
+              deleteQuietlyLocalJar(binary);
+              deleteQuietlyLocalJar(source);
+              if (classLoader != null) {
+                try {
+                  classLoader.close();
+                } catch (Exception ex) {
+                  logger.warn("Problem during closing class loader for {}", jarName, e);
+                }
               }
+              logger.error("Problem during remote functions load from {}", jarName, e);
             }
-            logger.error("Problem during remote functions load from {}", jarName, e);
           }
-        }
-        if (!jars.isEmpty()) {
-          localFunctionRegistry.register(jars);
-          return true;
+          if (!jars.isEmpty()) {
+            localFunctionRegistry.register(jars);
+            return true;
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/04fb0be1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 19123d0..76529d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -24,7 +24,6 @@ import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.FunctionNotFoundException;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.ops.QueryContext;
@@ -113,7 +112,7 @@ public class DrillSqlWorker {
 
   /**
    * Returns query physical plan.
-   * In case of {@link FunctionNotFoundException} and dynamic udf support is enabled, attempts to load remote functions.
+   * In case of {@link FunctionNotFoundException} attempts to load remote functions.
    * If at least one function was loaded or local function function registry version has changed,
    * makes one more attempt to get query physical plan.
    */
@@ -122,13 +121,11 @@ public class DrillSqlWorker {
     try {
       return handler.getPlan(sqlNode);
     } catch (FunctionNotFoundException e) {
-      if (context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
-        DrillOperatorTable drillOperatorTable = context.getDrillOperatorTable();
-        FunctionImplementationRegistry functionRegistry = context.getFunctionRegistry();
-        if (functionRegistry.loadRemoteFunctions(drillOperatorTable.getFunctionRegistryVersion())) {
-          drillOperatorTable.reloadOperators(functionRegistry);
-          return handler.getPlan(sqlNode);
-        }
+      DrillOperatorTable drillOperatorTable = context.getDrillOperatorTable();
+      FunctionImplementationRegistry functionRegistry = context.getFunctionRegistry();
+      if (functionRegistry.loadRemoteFunctions(drillOperatorTable.getFunctionRegistryVersion())) {
+        drillOperatorTable.reloadOperators(functionRegistry);
+        return handler.getPlan(sqlNode);
       }
       throw e;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/04fb0be1/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java b/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java
index ae73a3d..4676e39 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java
@@ -339,6 +339,31 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
   }
 
   @Test
+  public void testLazyInitWhenDynamicUdfSupportIsDisabled() throws Exception {
+    try {
+      test("select custom_lower('A') from (values(1))");
+    } catch (UserRemoteException e){
+      assertThat(e.getMessage(), containsString("No match found for function signature custom_lower(<CHARACTER>)"));
+    }
+
+    copyDefaultJarsToStagingArea();
+    test("create function using jar '%s'", default_binary_name);
+
+    try {
+      testBuilder()
+          .sqlQuery("select custom_lower('A') as res from (values(1))")
+          .optionSettingQueriesForTestQuery("alter system set `exec.udf.enable_dynamic_support` = false")
+          .unOrdered()
+          .baselineColumns("res")
+          .baselineValues("a")
+          .go();
+    } finally {
+      test("alter system reset `exec.udf.enable_dynamic_support`");
+    }
+  }
+
+
+  @Test
   public void testDropFunction() throws Exception {
     copyDefaultJarsToStagingArea();
     test("create function using jar '%s'", default_binary_name);

http://git-wip-us.apache.org/repos/asf/drill/blob/04fb0be1/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
index 7d28c9b..8b338af 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
 import mockit.NonStrictExpectations;
 
 import org.apache.drill.common.config.DrillConfig;
@@ -42,6 +44,8 @@ import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
+import org.apache.drill.exec.proto.UserBitShared.Registry;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
@@ -196,6 +200,12 @@ public class ExpressionTreeMaterializerTest extends ExecTest {
       }
     };
 
+    new MockUp<RemoteFunctionRegistry>() {
+      @Mock
+      Registry getRegistry() {
+        return Registry.getDefaultInstance();
+      }
+    };
 
     LogicalExpression functionCallExpr = new FunctionCall("testFunc",
       ImmutableList.of((LogicalExpression) new FieldReference("test", ExpressionPosition.UNKNOWN) ),


[4/5] drill git commit: DRILL-4831: Running refresh table metadata concurrently randomly fails with JsonParseException

Posted by pa...@apache.org.
DRILL-4831: Running refresh table metadata concurrently randomly fails with JsonParseException

This closes #653


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3f381181
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3f381181
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3f381181

Branch: refs/heads/master
Commit: 3f3811818ecc3bbf6f307a408c30f0406fadc703
Parents: cf29b91
Author: Padma Penumarthy <pp...@yahoo.com>
Authored: Thu Oct 20 05:03:13 2016 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed Nov 23 09:33:29 2016 -0800

----------------------------------------------------------------------
 .../drill/exec/store/parquet/Metadata.java      | 68 +++++++++++++++++---
 1 file changed, 60 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3f381181/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 7b3afb4..d819562 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Iterator;
+import java.util.UUID;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -40,6 +41,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Options;
+
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -209,11 +213,17 @@ public class Metadata {
     for (String oldname : OLD_METADATA_FILENAMES) {
       fs.delete(new Path(p, oldname), false);
     }
-    writeFile(parquetTableMetadata, new Path(p, METADATA_FILENAME));
+    // writeFile creates and writes to a tmp file first and then renames it
+    // to final metadata cache file name. We want the UUID appended to tmp file
+    // to be same for METADATA_FILENAME and METADATA_DIRECTORIES_FILENAME
+    // so we can track/debug things better.
+    // Generate UUID used for tmp file creation here
+    UUID tmpUUID =  UUID.randomUUID();
+    writeFile(parquetTableMetadata, path, tmpUUID);
 
     if (directoryList.size() > 0 && childFiles.size() == 0) {
       ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList);
-      writeFile(parquetTableMetadataDirs, new Path(p, METADATA_DIRECTORIES_FILENAME));
+      writeFile(parquetTableMetadataDirs, path, tmpUUID);
       logger.info("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
       timer.stop();
       return Pair.of(parquetTableMetadata, parquetTableMetadataDirs);
@@ -489,13 +499,35 @@ public class Metadata {
   }
 
   /**
+   * Renames Path srcPath to Path dstPath.
+   *
+   * @param srcPath
+   * @param dstPath
+   * @throws IOException
+   */
+  private void renameFile(Path srcPath, Path dstPath) throws IOException {
+    try {
+      // Use fileContext API as FileSystem rename is deprecated.
+      FileContext fileContext = FileContext.getFileContext(srcPath.toUri());
+      fileContext.rename(srcPath, dstPath, Options.Rename.OVERWRITE);
+    } catch (Exception e) {
+      logger.info("Metadata cache file rename from {} to {} failed", srcPath.toString(), dstPath.toString(), e);
+      throw new IOException("metadata cache file rename failed", e);
+    } finally {
+      if (fs.exists(srcPath)) {
+        fs.delete(srcPath, false);
+      }
+    }
+  }
+
+  /**
    * Serialize parquet metadata to json and write to a file
    *
    * @param parquetTableMetadata
    * @param p
    * @throws IOException
    */
-  private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, Path p) throws IOException {
+  private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, String path, UUID tmpUUID) throws IOException {
     JsonFactory jsonFactory = new JsonFactory();
     jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
     jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
@@ -503,23 +535,39 @@ public class Metadata {
     SimpleModule module = new SimpleModule();
     module.addSerializer(ColumnMetadata_v3.class, new ColumnMetadata_v3.Serializer());
     mapper.registerModule(module);
-    FSDataOutputStream os = fs.create(p);
+
+    // If multiple clients are updating metadata cache file concurrently, the cache file
+    // can get corrupted. To prevent this, write to a unique temporary file and then do
+    // atomic rename.
+    Path tmpPath = new Path(path, METADATA_FILENAME + "." + tmpUUID);
+    FSDataOutputStream os = fs.create(tmpPath);
     mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadata);
     os.flush();
     os.close();
+
+    Path finalPath = new Path(path, METADATA_FILENAME);
+    renameFile(tmpPath, finalPath);
   }
 
-  private void writeFile(ParquetTableMetadataDirs parquetTableMetadataDirs, Path p) throws IOException {
+  private void writeFile(ParquetTableMetadataDirs parquetTableMetadataDirs, String path, UUID tmpUUID) throws IOException {
     JsonFactory jsonFactory = new JsonFactory();
     jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
     jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
     ObjectMapper mapper = new ObjectMapper(jsonFactory);
     SimpleModule module = new SimpleModule();
     mapper.registerModule(module);
-    FSDataOutputStream os = fs.create(p);
+
+    // If multiple clients are updating metadata cache file concurrently, the cache file
+    // can get corrupted. To prevent this, write to a unique temporary file and then do
+    // atomic rename.
+    Path tmpPath = new Path(path, METADATA_DIRECTORIES_FILENAME + "." + tmpUUID);
+    FSDataOutputStream os = fs.create(tmpPath);
     mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadataDirs);
     os.flush();
     os.close();
+
+    Path finalPath = new Path(path,  METADATA_DIRECTORIES_FILENAME);
+    renameFile(tmpPath, finalPath);
   }
 
   /**
@@ -562,8 +610,10 @@ public class Metadata {
       logger.info("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS));
       timer.stop();
       if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), p, parentDir, metaContext)) {
+        // Do not remove scheme and authority from the path passed to createMetaFilesRecursively
+        // as we need full path to obtain proper fileContext in writeFile
         parquetTableMetadataDirs =
-            (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getRight();
+            (createMetaFilesRecursively(p.getParent().toString())).getRight();
         newMetadata = true;
       }
     } else {
@@ -571,8 +621,10 @@ public class Metadata {
       logger.info("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS));
       timer.stop();
       if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), p, parentDir, metaContext)) {
+        // Do not remove scheme and authority from the path passed to createMetaFilesRecursively
+        // as we need full path to obtain proper fileContext in writeFile
         parquetTableMetadata =
-            (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getLeft();
+            (createMetaFilesRecursively(p.getParent().toString())).getLeft();
         newMetadata = true;
       }