You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/11/23 23:42:09 UTC
[1/5] drill git commit: DRILL-4935: Allow drillbits to advertise a
configurable host address to Zookeeper
Repository: drill
Updated Branches:
refs/heads/master 35275536c -> 04fb0be19
DRILL-4935: Allow drillbits to advertise a configurable host address to
Zookeeper
This closes #647
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1c231735
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1c231735
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1c231735
Branch: refs/heads/master
Commit: 1c23173549ee53481316c85a65028f6ff9fb1b93
Parents: 3527553
Author: Harrison Mebane <ha...@svds.com>
Authored: Fri Oct 7 13:10:36 2016 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Mon Nov 21 17:32:09 2016 -0800
----------------------------------------------------------------------
distribution/src/resources/drill-env.sh | 4 ++++
.../org/apache/drill/exec/service/ServiceEngine.java | 11 ++++++++++-
2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/1c231735/distribution/src/resources/drill-env.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-env.sh b/distribution/src/resources/drill-env.sh
index 89a0c1e..cd11b47 100644
--- a/distribution/src/resources/drill-env.sh
+++ b/distribution/src/resources/drill-env.sh
@@ -62,6 +62,10 @@
#export DRILLBIT_CODE_CACHE_SIZE=${DRILLBIT_CODE_CACHE_SIZE:-"1G"}
+# Provide a customized host name for when the default mechanism is not accurate
+
+#export DRILL_HOST_NAME=`hostname`
+
# Base name for Drill log files. Files are named ${DRILL_LOG_NAME}.out, etc.
# DRILL_LOG_NAME="drillbit"
http://git-wip-us.apache.org/repos/asf/drill/blob/1c231735/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index d505546..9b7b6c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -140,9 +140,18 @@ public class ServiceEngine implements AutoCloseable {
name, context.getConfig().getLong(initReservation), context.getConfig().getLong(maxAllocation));
}
+ private String getHostName() throws UnknownHostException{
+ // DRILL_HOST_NAME sets custom host name. See drill-env.sh for details.
+ String customHost = System.getenv("DRILL_HOST_NAME");
+ if (customHost != null) {
+ return customHost;
+ }
+ return useIP ? InetAddress.getLocalHost().getHostAddress() : InetAddress.getLocalHost().getCanonicalHostName();
+ }
+
public DrillbitEndpoint start() throws DrillbitStartupException, UnknownHostException{
int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT), allowPortHunting);
- String address = useIP ? InetAddress.getLocalHost().getHostAddress() : InetAddress.getLocalHost().getCanonicalHostName();
+ String address = getHostName();
checkLoopbackAddress(address);
DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder()
[2/5] drill git commit: DRILL-4979: Make port of the DataConnection
configurable
Posted by pa...@apache.org.
DRILL-4979: Make port of the DataConnection configurable
This closes #649
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2de3a27c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2de3a27c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2de3a27c
Branch: refs/heads/master
Commit: 2de3a27c3440429d9bf852b6595542ce2bd0fd1b
Parents: 1c23173
Author: Korn, Uwe <Uw...@blue-yonder.com>
Authored: Wed Nov 9 13:02:20 2016 +0100
Committer: Parth Chandra <pa...@apache.org>
Committed: Mon Nov 21 17:38:33 2016 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/drill/exec/ExecConstants.java | 1 +
.../org/apache/drill/exec/rpc/data/DataConnectionCreator.java | 7 ++++++-
2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/2de3a27c/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 5f62781..bb451c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -44,6 +44,7 @@ public interface ExecConstants {
String BIT_TIMEOUT = "drill.exec.bit.timeout" ;
String SERVICE_NAME = "drill.exec.cluster-id";
String INITIAL_BIT_PORT = "drill.exec.rpc.bit.server.port";
+ String INITIAL_DATA_PORT = "drill.exec.rpc.bit.server.dataport";
String BIT_RPC_TIMEOUT = "drill.exec.rpc.bit.timeout";
String INITIAL_USER_PORT = "drill.exec.rpc.user.server.port";
String USER_RPC_TIMEOUT = "drill.exec.rpc.user.timeout";
http://git-wip-us.apache.org/repos/asf/drill/blob/2de3a27c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
index a90d356..5c71b91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.rpc.data;
import java.util.concurrent.ConcurrentMap;
import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -59,7 +60,11 @@ public class DataConnectionCreator implements AutoCloseable {
public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException {
server = new DataServer(context, dataAllocator, workBus, bee);
- int port = server.bind(partialEndpoint.getControlPort() + 1, allowPortHunting);
+ int port = partialEndpoint.getControlPort() + 1;
+ if (context.getConfig().hasPath(ExecConstants.INITIAL_DATA_PORT)) {
+ port = context.getConfig().getInt(ExecConstants.INITIAL_DATA_PORT);
+ }
+ port = server.bind(port, allowPortHunting);
DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setDataPort(port).build();
return completeEndpoint;
}
[3/5] drill git commit: DRILL-4980: Upgrading of the approach of
parquet date correctness status detection - Parquet writer version is added;
- Updated the detection method of parquet date correctness.
Posted by pa...@apache.org.
DRILL-4980: Upgrading of the approach of parquet date correctness status detection - Parquet writer version is added; - Updated the detection method of parquet date correctness.
This closes #644
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/cf29b917
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/cf29b917
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/cf29b917
Branch: refs/heads/master
Commit: cf29b917af4002a1ba0ef750ff3f5a553e910f8e
Parents: 2de3a27
Author: Vitalii Diravka <vi...@gmail.com>
Authored: Wed Oct 26 12:22:06 2016 +0000
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed Nov 23 09:33:29 2016 -0800
----------------------------------------------------------------------
.../drill/exec/store/parquet/Metadata.java | 61 ++++++++-----------
.../store/parquet/ParquetReaderUtility.java | 48 ++++++++++-----
.../exec/store/parquet/ParquetRecordWriter.java | 4 +-
.../drill/exec/store/parquet/ParquetWriter.java | 12 ++++
.../TestCorruptParquetDateCorrection.java | 6 +-
.../0_0_1.parquet | Bin 290 -> 292 bytes
.../0_0_2.parquet | Bin 290 -> 292 bytes
.../0_0_3.parquet | Bin 290 -> 292 bytes
.../0_0_4.parquet | Bin 290 -> 292 bytes
.../0_0_5.parquet | Bin 290 -> 292 bytes
.../0_0_6.parquet | Bin 290 -> 292 bytes
.../4203_corrected_dates.parquet | Bin 311 -> 313 bytes
...on_partitioned_metadata.requires_replace.txt | 2 +-
13 files changed, 76 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 04a2476..7b3afb4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -189,9 +189,9 @@ public class Metadata {
childFiles.add(file);
}
}
- ParquetTableMetadata_v3 parquetTableMetadata = new ParquetTableMetadata_v3(true);
+ ParquetTableMetadata_v3 parquetTableMetadata = new ParquetTableMetadata_v3(DrillVersionInfo.getVersion());
if (childFiles.size() > 0) {
- List<ParquetFileMetadata_v3> childFilesMetadata =
+ List<ParquetFileMetadata_v3 > childFilesMetadata =
getParquetFileMetadata_v3(parquetTableMetadata, childFiles);
metaDataList.addAll(childFilesMetadata);
// Note that we do not need to merge the columnInfo at this point. The columnInfo is already added
@@ -674,9 +674,6 @@ public class Metadata {
@JsonIgnore public abstract ParquetTableMetadataBase clone();
@JsonIgnore public abstract String getDrillVersion();
-
- @JsonIgnore public abstract boolean isDateCorrect();
-
}
public static abstract class ParquetFileMetadata {
@@ -811,12 +808,6 @@ public class Metadata {
public String getDrillVersion() {
return null;
}
-
- @JsonIgnore @Override
- public boolean isDateCorrect() {
- return false;
- }
-
}
@@ -1025,31 +1016,29 @@ public class Metadata {
@JsonProperty List<ParquetFileMetadata_v2> files;
@JsonProperty List<String> directories;
@JsonProperty String drillVersion;
- @JsonProperty boolean isDateCorrect;
public ParquetTableMetadata_v2() {
super();
}
- public ParquetTableMetadata_v2(boolean isDateCorrect) {
- this.drillVersion = DrillVersionInfo.getVersion();
- this.isDateCorrect = isDateCorrect;
+ public ParquetTableMetadata_v2(String drillVersion) {
+ this.drillVersion = drillVersion;
}
public ParquetTableMetadata_v2(ParquetTableMetadataBase parquetTable,
- List<ParquetFileMetadata_v2> files, List<String> directories) {
+ List<ParquetFileMetadata_v2> files, List<String> directories, String drillVersion) {
this.files = files;
this.directories = directories;
this.columnTypeInfo = ((ParquetTableMetadata_v2) parquetTable).columnTypeInfo;
- this.drillVersion = DrillVersionInfo.getVersion();
- this.isDateCorrect = true;
+ this.drillVersion = drillVersion;
}
public ParquetTableMetadata_v2(List<ParquetFileMetadata_v2> files, List<String> directories,
- ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo) {
+ ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo, String drillVersion) {
this.files = files;
this.directories = directories;
this.columnTypeInfo = columnTypeInfo;
+ this.drillVersion = drillVersion;
}
public ColumnTypeMetadata_v2 getColumnTypeInfo(String[] name) {
@@ -1096,7 +1085,7 @@ public class Metadata {
}
@JsonIgnore @Override public ParquetTableMetadataBase clone() {
- return new ParquetTableMetadata_v2(files, directories, columnTypeInfo);
+ return new ParquetTableMetadata_v2(files, directories, columnTypeInfo, drillVersion);
}
@JsonIgnore @Override
@@ -1104,10 +1093,6 @@ public class Metadata {
return drillVersion;
}
- @JsonIgnore @Override public boolean isDateCorrect() {
- return isDateCorrect;
- }
-
}
@@ -1383,30 +1368,38 @@ public class Metadata {
@JsonProperty List<ParquetFileMetadata_v3> files;
@JsonProperty List<String> directories;
@JsonProperty String drillVersion;
- @JsonProperty boolean isDateCorrect;
+ /**
+ * Default constructor needed for deserialization from Parquet Metadata Cache Files
+ * or for creating an empty instances of this class for the case when the Metadata Cache File is absent
+ */
public ParquetTableMetadata_v3() {
super();
}
- public ParquetTableMetadata_v3(boolean isDateCorrect) {
- this.drillVersion = DrillVersionInfo.getVersion();
- this.isDateCorrect = isDateCorrect;
+ /**
+ * Used for creating the Parquet Metadata Cache File
+ * @param drillVersion actual version of apache drill
+ */
+ public ParquetTableMetadata_v3(String drillVersion) {
+ this.drillVersion = drillVersion;
}
public ParquetTableMetadata_v3(ParquetTableMetadataBase parquetTable,
- List<ParquetFileMetadata_v3> files, List<String> directories) {
+ List<ParquetFileMetadata_v3> files, List<String> directories, String drillVersion) {
this.files = files;
this.directories = directories;
this.columnTypeInfo = ((ParquetTableMetadata_v3) parquetTable).columnTypeInfo;
- this.isDateCorrect = true;
+ this.drillVersion = drillVersion;
}
public ParquetTableMetadata_v3(List<ParquetFileMetadata_v3> files, List<String> directories,
- ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfo) {
+ ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfo,
+ String drillVersion) {
this.files = files;
this.directories = directories;
this.columnTypeInfo = columnTypeInfo;
+ this.drillVersion = drillVersion;
}
public ColumnTypeMetadata_v3 getColumnTypeInfo(String[] name) {
@@ -1453,7 +1446,7 @@ public class Metadata {
}
@JsonIgnore @Override public ParquetTableMetadataBase clone() {
- return new ParquetTableMetadata_v3(files, directories, columnTypeInfo);
+ return new ParquetTableMetadata_v3(files, directories, columnTypeInfo, drillVersion);
}
@JsonIgnore @Override
@@ -1461,10 +1454,6 @@ public class Metadata {
return drillVersion;
}
- @JsonIgnore @Override public boolean isDateCorrect() {
- return isDateCorrect;
- }
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 767c98d..b22e666 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -59,19 +59,26 @@ public class ParquetReaderUtility {
*/
public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
/**
- * All old parquet files (which haven't "is.date.correct=true" property in metadata) have
- * a corrupt date shift: {@value} days or 2 * {@value #JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH}
+ * All old parquet files (which haven't "is.date.correct=true" or "parquet-writer.version" properties
+ * in metadata) have a corrupt date shift: {@value} days or 2 * {@value #JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH}
*/
public static final long CORRECT_CORRUPT_DATE_SHIFT = 2 * JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH;
- // The year 5000 (or 1106685 day from Unix epoch) is chosen as the threshold for auto-detecting date corruption.
- // This balances two possible cases of bad auto-correction. External tools writing dates in the future will not
- // be shifted unless they are past this threshold (and we cannot identify them as external files based on the metadata).
- // On the other hand, historical dates written with Drill wouldn't risk being incorrectly shifted unless they were
- // something like 10,000 years in the past.
private static final Chronology UTC = org.joda.time.chrono.ISOChronology.getInstanceUTC();
+ /**
+ * The year 5000 (or 1106685 day from Unix epoch) is chosen as the threshold for auto-detecting date corruption.
+ * This balances two possible cases of bad auto-correction. External tools writing dates in the future will not
+ * be shifted unless they are past this threshold (and we cannot identify them as external files based on the metadata).
+ * On the other hand, historical dates written with Drill wouldn't risk being incorrectly shifted unless they were
+ * something like 10,000 years in the past.
+ */
public static final int DATE_CORRUPTION_THRESHOLD =
(int) (UTC.getDateTimeMillis(5000, 1, 1, 0) / DateTimeConstants.MILLIS_PER_DAY);
-
+ /**
+ * Version 2 (and later) of the Drill Parquet writer uses the date format described in the
+ * <a href="https://github.com/Parquet/parquet-format/blob/master/LogicalTypes.md#date">Parquet spec</a>.
+ * Prior versions had dates formatted with {@link org.apache.drill.exec.store.parquet.ParquetReaderUtility#CORRECT_CORRUPT_DATE_SHIFT}
+ */
+ public static final int DRILL_WRITER_VERSION_STD_DATE_FORMAT = 2;
/**
* For most recently created parquet files, we can determine if we have corrupted dates (see DRILL-4203)
* based on the file metadata. For older files that lack statistics we must actually test the values
@@ -130,10 +137,9 @@ public class ParquetReaderUtility {
}
public static void correctDatesInMetadataCache(Metadata.ParquetTableMetadataBase parquetTableMetadata) {
- boolean isDateCorrect = parquetTableMetadata.isDateCorrect();
- DateCorruptionStatus cacheFileContainsCorruptDates = isDateCorrect ?
- DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : DateCorruptionStatus.META_SHOWS_CORRUPTION;
- if (cacheFileContainsCorruptDates == DateCorruptionStatus.META_SHOWS_CORRUPTION) {
+ DateCorruptionStatus cacheFileCanContainsCorruptDates = parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v3 ?
+ DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : DateCorruptionStatus.META_UNCLEAR_TEST_VALUES;
+ if (cacheFileCanContainsCorruptDates == DateCorruptionStatus.META_UNCLEAR_TEST_VALUES) {
// Looking for the DATE data type of column names in the metadata cache file ("metadata_version" : "v2")
String[] names = new String[0];
if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v2) {
@@ -189,9 +195,20 @@ public class ParquetReaderUtility {
String createdBy = footer.getFileMetaData().getCreatedBy();
String drillVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.DRILL_VERSION_PROPERTY);
- String isDateCorrect = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.IS_DATE_CORRECT_PROPERTY);
+ String stringWriterVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.WRITER_VERSION_PROPERTY);
+ // This flag can be present in parquet files which were generated with 1.9.0-SNAPSHOT and 1.9.0 drill versions.
+ // If this flag is present it means that the version of the drill parquet writer is 2
+ final String isDateCorrectFlag = "is.date.correct";
+ String isDateCorrect = footer.getFileMetaData().getKeyValueMetaData().get(isDateCorrectFlag);
if (drillVersion != null) {
- return Boolean.valueOf(isDateCorrect) ? DateCorruptionStatus.META_SHOWS_NO_CORRUPTION
+ int writerVersion = 1;
+ if (stringWriterVersion != null) {
+ writerVersion = Integer.parseInt(stringWriterVersion);
+ }
+ else if (Boolean.valueOf(isDateCorrect)) {
+ writerVersion = DRILL_WRITER_VERSION_STD_DATE_FORMAT;
+ }
+ return writerVersion >= DRILL_WRITER_VERSION_STD_DATE_FORMAT ? DateCorruptionStatus.META_SHOWS_NO_CORRUPTION
: DateCorruptionStatus.META_SHOWS_CORRUPTION;
} else {
// Possibly an old, un-migrated Drill file, check the column statistics to see if min/max values look corrupt
@@ -261,7 +278,8 @@ public class ParquetReaderUtility {
// this reader only supports flat data, this is restricted in the ParquetScanBatchCreator
// creating a NameSegment makes sure we are using the standard code for comparing names,
// currently it is all case-insensitive
- if (AbstractRecordReader.isStarQuery(columns) || new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) {
+ if (AbstractRecordReader.isStarQuery(columns)
+ || new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) {
int colIndex = -1;
ConvertedType convertedType = schemaElements.get(column.getPath()[0]).getConverted_type();
if (convertedType != null && convertedType.equals(ConvertedType.DATE)) {
http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index bc6bb65..4ee863a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -78,7 +78,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
public static final String DRILL_VERSION_PROPERTY = "drill.version";
- public static final String IS_DATE_CORRECT_PROPERTY = "is.date.correct";
+ public static final String WRITER_VERSION_PROPERTY = "drill-writer.version";
private ParquetFileWriter parquetFileWriter;
private MessageType schema;
@@ -116,7 +116,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
this.partitionColumns = writer.getPartitionColumns();
this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0;
this.extraMetaData.put(DRILL_VERSION_PROPERTY, DrillVersionInfo.getVersion());
- this.extraMetaData.put(IS_DATE_CORRECT_PROPERTY, "true");
+ this.extraMetaData.put(WRITER_VERSION_PROPERTY, String.valueOf(ParquetWriter.WRITER_VERSION));
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 49c231e..716c56d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -40,6 +40,18 @@ import com.google.common.base.Preconditions;
public class ParquetWriter extends AbstractWriter {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetWriter.class);
+/** Version of Drill's Parquet writer. Increment this version (by 1) any time we make any format change to the file.
+ * Format changes include:
+ * <ul>
+ * <li>Supporting new data types,
+ * <li>Changes to the format of data fields,
+ * <li>Adding new metadata to the file footer, etc.
+ * </ul>
+ * Newer readers must be able to read old files. The Writer version tells the Parquet reader how to interpret fields
+ * or metadata when that data changes format from one writer version to another.
+ */
+ public static final int WRITER_VERSION = 2;
+
private final String location;
private final List<String> partitionColumns;
private final ParquetFormatPlugin formatPlugin;
http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
index 27cc093..0ab247d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
@@ -67,8 +67,8 @@ import java.util.regex.Pattern;
public class TestCorruptParquetDateCorrection extends PlanTestBase {
// 4 files are in the directory:
- // - one created with the fixed version of the reader
- // - files have extra meta field: is.date.correct = true
+ // - one created with the parquet-writer version number of "2"
+ // - files have extra meta field: parquet-writer.version = 2
// - one from and old version of Drill, before we put in proper created by in metadata
// - this is read properly by looking at a Max value in the file statistics, to see that
// it is way off of a typical date value
@@ -123,7 +123,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
/**
* Test reading a directory full of partitioned parquet files with dates, these files have a drill version
- * number of 1.9.0-SNAPSHOT and is.date.correct = true label in their footers, so we can be certain
+ * number of "1.9.0-SNAPSHOT" and parquet-writer version number of "2" in their footers, so we can be certain
* they do not have corruption. The option to disable the correction is passed, but it will not change the result
* in the case where we are certain correction is NOT needed. For more info see DRILL-4203.
*/
http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet
index 0ec76c8..34224c1 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet
index b43ab9d..8ce72e1 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet
index 95d46bb..b96fa61 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet
index 9f7269e..3c5a13a 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet
index b0d370a..3a6db56 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet
index a5c7b86..6e053ee 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet
index 3d46f56..7a461d0 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/cf29b917/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
index bb6e282..54f18a8 100644
--- a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
+++ b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
@@ -297,5 +297,5 @@
} ]
} ],
"directories" : [ "file:REPLACED_IN_TEST/mixed_partitioned/1_9_0_partitioned_no_corruption", "file:REPLACED_IN_TEST/mixed_partitioned/partitioned_with_corruption_4203" ],
- "isDateCorrect" : "true"
+ "drillVersion" : "1.9.0-SNAPSHOT"
}
[5/5] drill git commit: DRILL-4995: Allow lazy init when dynamic UDF
support is disabled
Posted by pa...@apache.org.
DRILL-4995: Allow lazy init when dynamic UDF support is disabled
This closes #645
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/04fb0be1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/04fb0be1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/04fb0be1
Branch: refs/heads/master
Commit: 04fb0be191ef09409c00ca7173cb903dfbe2abb0
Parents: 3f38118
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Thu Nov 3 16:20:04 2016 +0000
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed Nov 23 09:33:30 2016 -0800
----------------------------------------------------------------------
.../expr/fn/FunctionImplementationRegistry.java | 79 ++++++++++----------
.../drill/exec/planner/sql/DrillSqlWorker.java | 15 ++--
.../org/apache/drill/TestDynamicUDFSupport.java | 25 +++++++
.../record/ExpressionTreeMaterializerTest.java | 10 +++
4 files changed, 79 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/04fb0be1/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index 988a9f6..d0aa33f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -140,9 +140,9 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
}
/**
- * Using the given <code>functionResolver</code> find Drill function implementation for given
- * <code>functionCall</code>
- * If function implementation was not found and in case if Dynamic UDF Support is enabled
+ * Using the given <code>functionResolver</code>
+ * finds Drill function implementation for given <code>functionCall</code>.
+ * If function implementation was not found,
* loads all missing remote functions and tries to find Drill implementation one more time.
*/
@Override
@@ -154,12 +154,8 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
AtomicLong version = new AtomicLong();
DrillFuncHolder holder = functionResolver.getBestMatch(
localFunctionRegistry.getMethods(functionReplacement(functionCall), version), functionCall);
- if (holder == null && retry) {
- if (optionManager != null && optionManager.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
- if (loadRemoteFunctions(version.get())) {
- return findDrillFunction(functionResolver, functionCall, false);
- }
- }
+ if (holder == null && retry && loadRemoteFunctions(version.get())) {
+ return findDrillFunction(functionResolver, functionCall, false);
}
return holder;
}
@@ -183,7 +179,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
/**
* Find the Drill function implementation that matches the name, arg types and return type.
- * If exact function implementation was not found and in case if Dynamic UDF Support is enabled
+ * If exact function implementation was not found,
* loads all missing remote functions and tries to find Drill implementation one more time.
*/
public DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType) {
@@ -197,11 +193,8 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
return h;
}
}
-
- if (retry && optionManager != null && optionManager.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
- if (loadRemoteFunctions(version.get())) {
- return findExactMatchingDrillFunction(name, argTypes, returnType, false);
- }
+ if (retry && loadRemoteFunctions(version.get())) {
+ return findExactMatchingDrillFunction(name, argTypes, returnType, false);
}
return null;
}
@@ -287,35 +280,39 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
if (!missingJars.isEmpty()) {
synchronized (this) {
missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
- List<JarScan> jars = Lists.newArrayList();
- for (String jarName : missingJars) {
- Path binary = null;
- Path source = null;
- URLClassLoader classLoader = null;
- try {
- binary = copyJarToLocal(jarName, remoteFunctionRegistry);
- source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry);
- URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
- classLoader = new URLClassLoader(urls);
- ScanResult scanResult = scan(classLoader, binary, urls);
- localFunctionRegistry.validate(jarName, scanResult);
- jars.add(new JarScan(jarName, scanResult, classLoader));
- } catch (Exception e) {
- deleteQuietlyLocalJar(binary);
- deleteQuietlyLocalJar(source);
- if (classLoader != null) {
- try {
- classLoader.close();
- } catch (Exception ex) {
- logger.warn("Problem during closing class loader for {}", jarName, e);
+ if (!missingJars.isEmpty()) {
+ logger.info("Starting dynamic UDFs lazy-init process.\n" +
+ "The following jars are going to be downloaded and registered locally: " + missingJars);
+ List<JarScan> jars = Lists.newArrayList();
+ for (String jarName : missingJars) {
+ Path binary = null;
+ Path source = null;
+ URLClassLoader classLoader = null;
+ try {
+ binary = copyJarToLocal(jarName, remoteFunctionRegistry);
+ source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry);
+ URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
+ classLoader = new URLClassLoader(urls);
+ ScanResult scanResult = scan(classLoader, binary, urls);
+ localFunctionRegistry.validate(jarName, scanResult);
+ jars.add(new JarScan(jarName, scanResult, classLoader));
+ } catch (Exception e) {
+ deleteQuietlyLocalJar(binary);
+ deleteQuietlyLocalJar(source);
+ if (classLoader != null) {
+ try {
+ classLoader.close();
+ } catch (Exception ex) {
+ logger.warn("Problem during closing class loader for {}", jarName, e);
+ }
}
+ logger.error("Problem during remote functions load from {}", jarName, e);
}
- logger.error("Problem during remote functions load from {}", jarName, e);
}
- }
- if (!jars.isEmpty()) {
- localFunctionRegistry.register(jars);
- return true;
+ if (!jars.isEmpty()) {
+ localFunctionRegistry.register(jars);
+ return true;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/04fb0be1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 19123d0..76529d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -24,7 +24,6 @@ import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.FunctionNotFoundException;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.ops.QueryContext;
@@ -113,7 +112,7 @@ public class DrillSqlWorker {
/**
* Returns query physical plan.
- * In case of {@link FunctionNotFoundException} and dynamic udf support is enabled, attempts to load remote functions.
+ * In case of {@link FunctionNotFoundException} attempts to load remote functions.
* If at least one function was loaded or local function function registry version has changed,
* makes one more attempt to get query physical plan.
*/
@@ -122,13 +121,11 @@ public class DrillSqlWorker {
try {
return handler.getPlan(sqlNode);
} catch (FunctionNotFoundException e) {
- if (context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
- DrillOperatorTable drillOperatorTable = context.getDrillOperatorTable();
- FunctionImplementationRegistry functionRegistry = context.getFunctionRegistry();
- if (functionRegistry.loadRemoteFunctions(drillOperatorTable.getFunctionRegistryVersion())) {
- drillOperatorTable.reloadOperators(functionRegistry);
- return handler.getPlan(sqlNode);
- }
+ DrillOperatorTable drillOperatorTable = context.getDrillOperatorTable();
+ FunctionImplementationRegistry functionRegistry = context.getFunctionRegistry();
+ if (functionRegistry.loadRemoteFunctions(drillOperatorTable.getFunctionRegistryVersion())) {
+ drillOperatorTable.reloadOperators(functionRegistry);
+ return handler.getPlan(sqlNode);
}
throw e;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/04fb0be1/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java b/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java
index ae73a3d..4676e39 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java
@@ -339,6 +339,31 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
}
@Test
+ public void testLazyInitWhenDynamicUdfSupportIsDisabled() throws Exception {
+ try {
+ test("select custom_lower('A') from (values(1))");
+ } catch (UserRemoteException e){
+ assertThat(e.getMessage(), containsString("No match found for function signature custom_lower(<CHARACTER>)"));
+ }
+
+ copyDefaultJarsToStagingArea();
+ test("create function using jar '%s'", default_binary_name);
+
+ try {
+ testBuilder()
+ .sqlQuery("select custom_lower('A') as res from (values(1))")
+ .optionSettingQueriesForTestQuery("alter system set `exec.udf.enable_dynamic_support` = false")
+ .unOrdered()
+ .baselineColumns("res")
+ .baselineValues("a")
+ .go();
+ } finally {
+ test("alter system reset `exec.udf.enable_dynamic_support`");
+ }
+ }
+
+
+ @Test
public void testDropFunction() throws Exception {
copyDefaultJarsToStagingArea();
test("create function using jar '%s'", default_binary_name);
http://git-wip-us.apache.org/repos/asf/drill/blob/04fb0be1/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
index 7d28c9b..8b338af 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
import mockit.NonStrictExpectations;
import org.apache.drill.common.config.DrillConfig;
@@ -42,6 +44,8 @@ import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
+import org.apache.drill.exec.proto.UserBitShared.Registry;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
@@ -196,6 +200,12 @@ public class ExpressionTreeMaterializerTest extends ExecTest {
}
};
+ new MockUp<RemoteFunctionRegistry>() {
+ @Mock
+ Registry getRegistry() {
+ return Registry.getDefaultInstance();
+ }
+ };
LogicalExpression functionCallExpr = new FunctionCall("testFunc",
ImmutableList.of((LogicalExpression) new FieldReference("test", ExpressionPosition.UNKNOWN) ),
[4/5] drill git commit: DRILL-4831: Running refresh table metadata
concurrently randomly fails with JsonParseException
Posted by pa...@apache.org.
DRILL-4831: Running refresh table metadata concurrently randomly fails with JsonParseException
This closes #653
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3f381181
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3f381181
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3f381181
Branch: refs/heads/master
Commit: 3f3811818ecc3bbf6f307a408c30f0406fadc703
Parents: cf29b91
Author: Padma Penumarthy <pp...@yahoo.com>
Authored: Thu Oct 20 05:03:13 2016 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed Nov 23 09:33:29 2016 -0800
----------------------------------------------------------------------
.../drill/exec/store/parquet/Metadata.java | 68 +++++++++++++++++---
1 file changed, 60 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/3f381181/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 7b3afb4..d819562 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Iterator;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -40,6 +41,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Options;
+
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -209,11 +213,17 @@ public class Metadata {
for (String oldname : OLD_METADATA_FILENAMES) {
fs.delete(new Path(p, oldname), false);
}
- writeFile(parquetTableMetadata, new Path(p, METADATA_FILENAME));
+ // writeFile creates and writes to a tmp file first and then renames it
+ // to final metadata cache file name. We want the UUID appended to tmp file
+ // to be same for METADATA_FILENAME and METADATA_DIRECTORIES_FILENAME
+ // so we can track/debug things better.
+ // Generate UUID used for tmp file creation here
+ UUID tmpUUID = UUID.randomUUID();
+ writeFile(parquetTableMetadata, path, tmpUUID);
if (directoryList.size() > 0 && childFiles.size() == 0) {
ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList);
- writeFile(parquetTableMetadataDirs, new Path(p, METADATA_DIRECTORIES_FILENAME));
+ writeFile(parquetTableMetadataDirs, path, tmpUUID);
logger.info("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
return Pair.of(parquetTableMetadata, parquetTableMetadataDirs);
@@ -489,13 +499,35 @@ public class Metadata {
}
/**
+ * Renames Path srcPath to Path dstPath.
+ *
+ * @param srcPath
+ * @param dstPath
+ * @throws IOException
+ */
+ private void renameFile(Path srcPath, Path dstPath) throws IOException {
+ try {
+ // Use fileContext API as FileSystem rename is deprecated.
+ FileContext fileContext = FileContext.getFileContext(srcPath.toUri());
+ fileContext.rename(srcPath, dstPath, Options.Rename.OVERWRITE);
+ } catch (Exception e) {
+ logger.info("Metadata cache file rename from {} to {} failed", srcPath.toString(), dstPath.toString(), e);
+ throw new IOException("metadata cache file rename failed", e);
+ } finally {
+ if (fs.exists(srcPath)) {
+ fs.delete(srcPath, false);
+ }
+ }
+ }
+
+ /**
* Serialize parquet metadata to json and write to a file
*
* @param parquetTableMetadata
* @param p
* @throws IOException
*/
- private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, Path p) throws IOException {
+ private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, String path, UUID tmpUUID) throws IOException {
JsonFactory jsonFactory = new JsonFactory();
jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
@@ -503,23 +535,39 @@ public class Metadata {
SimpleModule module = new SimpleModule();
module.addSerializer(ColumnMetadata_v3.class, new ColumnMetadata_v3.Serializer());
mapper.registerModule(module);
- FSDataOutputStream os = fs.create(p);
+
+ // If multiple clients are updating metadata cache file concurrently, the cache file
+ // can get corrupted. To prevent this, write to a unique temporary file and then do
+ // atomic rename.
+ Path tmpPath = new Path(path, METADATA_FILENAME + "." + tmpUUID);
+ FSDataOutputStream os = fs.create(tmpPath);
mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadata);
os.flush();
os.close();
+
+ Path finalPath = new Path(path, METADATA_FILENAME);
+ renameFile(tmpPath, finalPath);
}
- private void writeFile(ParquetTableMetadataDirs parquetTableMetadataDirs, Path p) throws IOException {
+ private void writeFile(ParquetTableMetadataDirs parquetTableMetadataDirs, String path, UUID tmpUUID) throws IOException {
JsonFactory jsonFactory = new JsonFactory();
jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
ObjectMapper mapper = new ObjectMapper(jsonFactory);
SimpleModule module = new SimpleModule();
mapper.registerModule(module);
- FSDataOutputStream os = fs.create(p);
+
+ // If multiple clients are updating metadata cache file concurrently, the cache file
+ // can get corrupted. To prevent this, write to a unique temporary file and then do
+ // atomic rename.
+ Path tmpPath = new Path(path, METADATA_DIRECTORIES_FILENAME + "." + tmpUUID);
+ FSDataOutputStream os = fs.create(tmpPath);
mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadataDirs);
os.flush();
os.close();
+
+ Path finalPath = new Path(path, METADATA_DIRECTORIES_FILENAME);
+ renameFile(tmpPath, finalPath);
}
/**
@@ -562,8 +610,10 @@ public class Metadata {
logger.info("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), p, parentDir, metaContext)) {
+ // Do not remove scheme and authority from the path passed to createMetaFilesRecursively
+ // as we need full path to obtain proper fileContext in writeFile
parquetTableMetadataDirs =
- (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getRight();
+ (createMetaFilesRecursively(p.getParent().toString())).getRight();
newMetadata = true;
}
} else {
@@ -571,8 +621,10 @@ public class Metadata {
logger.info("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), p, parentDir, metaContext)) {
+ // Do not remove scheme and authority from the path passed to createMetaFilesRecursively
+ // as we need full path to obtain proper fileContext in writeFile
parquetTableMetadata =
- (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getLeft();
+ (createMetaFilesRecursively(p.getParent().toString())).getLeft();
newMetadata = true;
}