You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/03/26 16:59:16 UTC
[1/5] drill git commit: DRILL-2275: Added support to get information
about current cluster memory and threads
Repository: drill
Updated Branches:
refs/heads/master 7bded6d3c -> 50ad974e9
DRILL-2275: Added support to get information about current cluster memory and threads
+ SystemRecordReader reads a SystemRecord e.g. MemoryRecord
+ Added generic data type for static tables
+ GroupScan can enforce width to be maximum width on ExcessiveExchangeRemover
+ GroupScan has minimum width for SimpleParallelizer
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8ab361b1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8ab361b1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8ab361b1
Branch: refs/heads/master
Commit: 8ab361b123314b225733603560de36c170bbc117
Parents: 7bded6d
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Fri Mar 6 17:07:31 2015 -0800
Committer: Sudheesh Katkam <sk...@maprtech.com>
Committed: Wed Mar 25 18:16:29 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/common/JSONOptions.java | 24 +++-
.../drill/exec/memory/TopLevelAllocator.java | 3 +
.../exec/physical/base/AbstractGroupScan.java | 14 +-
.../drill/exec/physical/base/GroupScan.java | 16 +++
.../planner/fragment/ParallelizationInfo.java | 10 +-
.../planner/fragment/SimpleParallelizer.java | 4 +-
.../drill/exec/planner/fragment/Stats.java | 4 +
.../exec/planner/fragment/StatsCollector.java | 4 +-
.../visitor/ExcessiveExchangeIdentifier.java | 7 +
.../apache/drill/exec/store/RecordDataType.java | 65 +++++++++
.../drill/exec/store/pojo/PojoDataType.java | 42 +++---
.../drill/exec/store/sys/DrillbitIterator.java | 4 +-
.../drill/exec/store/sys/MemoryRecord.java | 141 +++++++++++++++++++
.../drill/exec/store/sys/StaticDrillTable.java | 19 +--
.../drill/exec/store/sys/SystemRecord.java | 44 ++++++
.../exec/store/sys/SystemRecordReader.java | 77 ++++++++++
.../drill/exec/store/sys/SystemTable.java | 93 +++++++++---
.../exec/store/sys/SystemTableBatchCreator.java | 29 +++-
.../drill/exec/store/sys/SystemTablePlugin.java | 68 ++++-----
.../exec/store/sys/SystemTablePluginConfig.java | 12 +-
.../drill/exec/store/sys/SystemTableScan.java | 56 ++++++--
.../drill/exec/store/sys/ThreadsRecord.java | 119 ++++++++++++++++
.../drill/exec/store/sys/TestSystemTable.java | 12 ++
23 files changed, 749 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/common/src/main/java/org/apache/drill/common/JSONOptions.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/JSONOptions.java b/common/src/main/java/org/apache/drill/common/JSONOptions.java
index 64e6d52..945cd92 100644
--- a/common/src/main/java/org/apache/drill/common/JSONOptions.java
+++ b/common/src/main/java/org/apache/drill/common/JSONOptions.java
@@ -48,7 +48,7 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
@JsonDeserialize(using = De.class)
public class JSONOptions {
- final static Logger logger = LoggerFactory.getLogger(JSONOptions.class);
+ private final static Logger logger = LoggerFactory.getLogger(JSONOptions.class);
private JsonNode root;
private JsonLocation location;
@@ -67,17 +67,30 @@ public class JSONOptions {
public <T> T getWith(DrillConfig config, Class<T> c) {
try {
if (opaque != null) {
- if (opaque.getClass().equals(c)) {
+ final Class<?> opaqueClass = opaque.getClass();
+ if (opaqueClass.equals(c)) {
return (T) opaque;
} else {
- throw new IllegalArgumentException(String.format("Attmpted to retrieve a option with type of %s. However, the JSON options carried an opaque value of type %s.", c.getName(), opaque.getClass().getName()));
+ // Enum values that override methods are given $1, $2 ... extensions. Ignore the extension.
+ // e.g. SystemTable$1 for SystemTable.OPTION
+ if (c.isEnum()) {
+ final String opaqueName = opaqueClass.getName().replaceAll("\\$\\d+$", "");
+ final String cName = c.getName();
+ if(opaqueName.equals(cName)) {
+ return (T) opaque;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Attempted to retrieve a option with type of %s. " +
+ "However, the JSON options carried an opaque value of type %s.", c.getName(), opaqueClass.getName()));
}
}
//logger.debug("Read tree {}", root);
return config.getMapper().treeToValue(root, c);
} catch (JsonProcessingException e) {
- throw new LogicalPlanParsingException(String.format("Failure while trying to convert late bound json options to type of %s. Reference was originally located at line %d, column %d.", c.getCanonicalName(), location.getLineNr(), location.getColumnNr()), e);
+ throw new LogicalPlanParsingException(String.format("Failure while trying to convert late bound " +
+ "json options to type of %s. Reference was originally located at line %d, column %d.",
+ c.getCanonicalName(), location.getLineNr(), location.getColumnNr()), e);
}
}
@@ -95,7 +108,8 @@ public class JSONOptions {
if ( c.equals(opaque.getClass())) {
return (T) opaque;
} else {
- throw new IOException(String.format("Attmpted to retrieve a list with type of %s. However, the JSON options carried an opaque value of type %s.", t.getType(), opaque.getClass().getName()));
+ throw new IOException(String.format("Attempted to retrieve a list with type of %s. However, the JSON " +
+ "options carried an opaque value of type %s.", t.getType(), opaque.getClass().getName()));
}
}
if (root == null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index af8c1dc..d22651e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -37,6 +37,8 @@ import org.apache.drill.exec.util.Pointer;
public class TopLevelAllocator implements BufferAllocator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopLevelAllocator.class);
+ public static long MAXIMUM_DIRECT_MEMORY;
+
private static final boolean ENABLE_ACCOUNTING = AssertionUtil.isAssertionsEnabled();
private final Map<ChildAllocator, StackTraceElement[]> childrenMap;
private final PooledByteBufAllocatorL innerAllocator = PooledByteBufAllocatorL.DEFAULT;
@@ -56,6 +58,7 @@ public class TopLevelAllocator implements BufferAllocator {
}
private TopLevelAllocator(DrillConfig config, long maximumAllocation, boolean errorOnLeak){
+ MAXIMUM_DIRECT_MEMORY = maximumAllocation;
this.config=(config!=null) ? config : DrillConfig.create();
this.errorOnLeak = errorOnLeak;
this.acct = new Accountor(config, errorOnLeak, null, null, maximumAllocation, 0, true);
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 276ecb5..8fe21e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -28,7 +28,7 @@ import com.google.common.collect.Iterators;
import org.apache.drill.exec.physical.EndpointAffinity;
public abstract class AbstractGroupScan extends AbstractBase implements GroupScan {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class);
@Override
public Iterator<PhysicalOperator> iterator() {
@@ -57,6 +57,18 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
@Override
@JsonIgnore
+ public int getMinParallelizationWidth() {
+ return 1;
+ }
+
+ @Override
+ @JsonIgnore
+ public boolean enforceWidth() {
+ return false;
+ }
+
+ @Override
+ @JsonIgnore
public long getInitialAllocation() {
return 0;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 23860a3..60b8330 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -45,6 +45,22 @@ public interface GroupScan extends Scan, HasAffinity{
public int getMaxParallelizationWidth();
/**
+ * At minimum, the GroupScan requires these many fragments to run.
+ * Currently, this is used in {@link org.apache.drill.exec.planner.fragment.SimpleParallelizer}
+ * @return the minimum number of fragments that should run
+ */
+ @JsonIgnore
+ public int getMinParallelizationWidth();
+
+ /**
+ * Check if GroupScan enforces width to be maximum parallelization width.
+ * Currently, this is used in {@link org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier}
+ * @return if maximum width should be enforced
+ */
+ @JsonIgnore
+ public boolean enforceWidth();
+
+ /**
* Returns a signature of the {@link GroupScan} which should usually be composed of
* all its attributes which could describe it uniquely.
*/
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
index 75a009e..8e775af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
@@ -96,8 +96,8 @@ public class ParallelizationInfo {
private final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = Maps.newHashMap();
public void add(ParallelizationInfo parallelizationInfo) {
- this.minWidth = Math.max(minWidth, parallelizationInfo.minWidth);
- this.maxWidth = Math.min(maxWidth, parallelizationInfo.maxWidth);
+ minWidth = Math.max(minWidth, parallelizationInfo.minWidth);
+ maxWidth = Math.min(maxWidth, parallelizationInfo.maxWidth);
Map<DrillbitEndpoint, EndpointAffinity> affinityMap = parallelizationInfo.getEndpointAffinityMap();
for(Map.Entry<DrillbitEndpoint, EndpointAffinity> epAff : affinityMap.entrySet()) {
@@ -106,7 +106,11 @@ public class ParallelizationInfo {
}
public void addMaxWidth(int newMaxWidth) {
- this.maxWidth = Math.min(maxWidth, newMaxWidth);
+ maxWidth = Math.min(maxWidth, newMaxWidth);
+ }
+
+ public void addMinWidth(int newMinWidth) {
+ minWidth = Math.max(minWidth, newMinWidth);
}
public void addEndpointAffinities(List<EndpointAffinity> endpointAffinities) {
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 12043ce..66ba229 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -264,8 +264,8 @@ public class SimpleParallelizer {
}
if (width < numRequiredNodes) {
- throw new PhysicalOperatorSetupException("Can not parallelize the fragment as the parallelization width is " +
- "less than the number of mandatory nodes (nodes with +INFINITE affinity).");
+ throw new PhysicalOperatorSetupException("Can not parallelize the fragment as the parallelization width (" + width + ") is " +
+ "less than the number of mandatory nodes (" + numRequiredNodes + " nodes with +INFINITE affinity).");
}
// Find the maximum number of slots which should go to endpoints with affinity (See DRILL-825 for details)
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
index e61b38f..b5b8ce4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
@@ -39,6 +39,10 @@ public class Stats {
collector.addMaxWidth(maxWidth);
}
+ public void addMinWidth(int minWidth) {
+ collector.addMinWidth(minWidth);
+ }
+
public void addEndpointAffinities(List<EndpointAffinity> endpointAffinityList) {
collector.addEndpointAffinities(endpointAffinityList);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index 1f56556..4f4e0b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -85,7 +85,9 @@ public class StatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeExcept
@Override
public Void visitGroupScan(GroupScan groupScan, Wrapper wrapper) {
- wrapper.getStats().addMaxWidth(groupScan.getMaxParallelizationWidth());
+ final Stats stats = wrapper.getStats();
+ stats.addMaxWidth(groupScan.getMaxParallelizationWidth());
+ stats.addMinWidth(groupScan.getMinParallelizationWidth());
return super.visitGroupScan(groupScan, wrapper);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
index a237014..9d74802 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
@@ -94,6 +94,7 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
class MajorFragmentStat {
private double maxRows = 0d;
private int maxWidth = Integer.MAX_VALUE;
+ private boolean enforceWidth = false;
public void add(Prel prel) {
maxRows = Math.max(prel.getRows(), maxRows);
@@ -105,10 +106,16 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
public void addScan(ScanPrel prel) {
maxWidth = Math.min(maxWidth, prel.getGroupScan().getMaxParallelizationWidth());
+ enforceWidth = prel.getGroupScan().enforceWidth();
add(prel);
}
public boolean isSingular() {
+ // do not remove exchanges when a scan enforces width (e.g. SystemTableScan)
+ if (enforceWidth) {
+ return false;
+ }
+
int suggestedWidth = (int) Math.ceil((maxRows+1)/targetSliceSize);
int w = Math.min(maxWidth, suggestedWidth);
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordDataType.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordDataType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordDataType.java
new file mode 100644
index 0000000..889db12
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordDataType.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.collect.Lists;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import java.util.List;
+
+/**
+ * RecordDataType defines names and data types of columns in a static drill table.
+ */
+public abstract class RecordDataType {
+
+ /**
+ * @return the {@link org.eigenbase.sql.type.SqlTypeName} of columns in the table
+ */
+ public abstract List<SqlTypeName> getFieldSqlTypeNames();
+
+ /**
+ * @return the column names in the table
+ */
+ public abstract List<String> getFieldNames();
+
+ /**
+ * This method constructs a {@link org.eigenbase.reltype.RelDataType} based on the
+ * {@link org.apache.drill.exec.store.RecordDataType}'s field sql types and field names.
+ *
+ * @param factory helps construct a {@link org.eigenbase.reltype.RelDataType}
+ * @return the constructed type
+ */
+ public final RelDataType getRowType(RelDataTypeFactory factory) {
+ final List<SqlTypeName> types = getFieldSqlTypeNames();
+ final List<String> names = getFieldNames();
+ final List<RelDataType> fields = Lists.newArrayList();
+ for (final SqlTypeName typeName : types) {
+ switch (typeName) {
+ case VARCHAR:
+ fields.add(factory.createSqlType(typeName, Integer.MAX_VALUE));
+ break;
+ default:
+ fields.add(factory.createSqlType(typeName));
+ }
+ }
+ return factory.createStructType(fields, names);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
index c1e64e6..2acb727 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
@@ -22,24 +22,24 @@ import java.lang.reflect.Modifier;
import java.sql.Timestamp;
import java.util.List;
-import org.eigenbase.reltype.RelDataType;
-import org.eigenbase.reltype.RelDataTypeFactory;
+import org.apache.drill.exec.store.RecordDataType;
import org.eigenbase.sql.type.SqlTypeName;
import com.google.common.collect.Lists;
-public class PojoDataType {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoDataType.class);
+/**
+ * This class uses reflection of a Java class to construct a {@link org.apache.drill.exec.store.RecordDataType}.
+ */
+public class PojoDataType extends RecordDataType {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoDataType.class);
- public List<SqlTypeName> types = Lists.newArrayList();
- public List<String> names = Lists.newArrayList();
+ private final List<SqlTypeName> types = Lists.newArrayList();
+ private final List<String> names = Lists.newArrayList();
+ private final Class<?> pojoClass;
public PojoDataType(Class<?> pojoClass) {
- logger.debug(pojoClass.getName());
- Field[] fields = pojoClass.getDeclaredFields();
- for (int i = 0; i < fields.length; i++) {
- Field f = fields[i];
-
+ this.pojoClass = pojoClass;
+ for (Field f : pojoClass.getDeclaredFields()) {
if (Modifier.isStatic(f.getModifiers())) {
continue;
}
@@ -62,17 +62,23 @@ public class PojoDataType {
} else if (type == Timestamp.class) {
types.add(SqlTypeName.TIMESTAMP);
} else {
- throw new RuntimeException(String.format("PojoRecord reader doesn't yet support conversions from type [%s].", type));
+ throw new RuntimeException(String.format("PojoDataType doesn't yet support conversions from type [%s].", type));
}
}
}
- public RelDataType getRowType(RelDataTypeFactory f) {
- List<RelDataType> fields = Lists.newArrayList();
- for (SqlTypeName n : types) {
- fields.add(f.createSqlType(n));
- }
- return f.createStructType(fields, names);
+ public Class<?> getPojoClass() {
+ return pojoClass;
+ }
+
+ @Override
+ public List<SqlTypeName> getFieldSqlTypeNames() {
+ return types;
+ }
+
+ @Override
+ public List<String> getFieldNames() {
+ return names;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
index 67d7cf9..08bc0ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
@@ -34,7 +34,7 @@ public class DrillbitIterator implements Iterator<Object> {
}
public static class DrillbitInstance {
- public String host;
+ public String hostname;
public int user_port;
public int control_port;
public int data_port;
@@ -51,7 +51,7 @@ public class DrillbitIterator implements Iterator<Object> {
DrillbitEndpoint ep = endpoints.next();
DrillbitInstance i = new DrillbitInstance();
i.current = ep.equals(current);
- i.host = ep.getAddress();
+ i.hostname = ep.getAddress();
i.user_port = ep.getUserPort();
i.control_port = ep.getControlPort();
i.data_port = ep.getDataPort();
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java
new file mode 100644
index 0000000..9cb001d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.List;
+
+/**
+ * A {@link org.apache.drill.exec.store.sys.SystemRecord} that holds information about drillbit memory
+ */
+public class MemoryRecord extends SystemRecord {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryRecord.class);
+
+ private static final MemoryRecord INSTANCE = new MemoryRecord();
+
+ public static SystemRecord getInstance() {
+ return INSTANCE;
+ }
+
+ private static final String HOST_NAME = "hostname";
+ private static final MaterializedField hostNameField = MaterializedField.create(HOST_NAME,
+ Types.required(TypeProtos.MinorType.VARCHAR));
+ private VarCharVector hostName;
+
+ private static final String USER_PORT = "user_port";
+ private static final MaterializedField userPortField = MaterializedField.create(USER_PORT,
+ Types.required(TypeProtos.MinorType.BIGINT));
+ private BigIntVector userPort;
+
+ private static final String CURRENT_HEAP_SIZE = "heap_current";
+ private static final MaterializedField currentHeapSizeField = MaterializedField.create(CURRENT_HEAP_SIZE,
+ Types.required(TypeProtos.MinorType.BIGINT));
+ private BigIntVector currentHeapSize;
+
+ private static final String MAX_HEAP_SIZE = "heap_max";
+ private static final MaterializedField maxHeapSizeField = MaterializedField.create(MAX_HEAP_SIZE,
+ Types.required(TypeProtos.MinorType.BIGINT));
+ private BigIntVector maxHeapSize;
+
+ private static final String CURRENT_DIRECT_MEMORY = "direct_current";
+ private static final MaterializedField currentDirectMemoryField = MaterializedField.create(CURRENT_DIRECT_MEMORY,
+ Types.required(TypeProtos.MinorType.BIGINT));
+ private BigIntVector currentDirectMemory;
+
+ private static final String MAX_DIRECT_MEMORY = "direct_max";
+ private static final MaterializedField maxDirectMemoryField = MaterializedField.create(MAX_DIRECT_MEMORY,
+ Types.required(TypeProtos.MinorType.BIGINT));
+ private BigIntVector maxDirectMemory;
+
+ private static final List<SqlTypeName> FIELDS = Lists.newArrayList(SqlTypeName.VARCHAR, SqlTypeName.BIGINT,
+ SqlTypeName.BIGINT, SqlTypeName.BIGINT, SqlTypeName.BIGINT, SqlTypeName.BIGINT);
+
+ private static final List<String> NAMES = Lists.newArrayList(HOST_NAME, USER_PORT, CURRENT_HEAP_SIZE, MAX_HEAP_SIZE,
+ CURRENT_DIRECT_MEMORY, MAX_DIRECT_MEMORY);
+
+ private MemoryRecord() {
+ }
+
+ @Override
+ public void setup(final OutputMutator output) throws SchemaChangeException {
+ hostName = output.addField(hostNameField, VarCharVector.class);
+ userPort = output.addField(userPortField, BigIntVector.class);
+ currentHeapSize = output.addField(currentHeapSizeField, BigIntVector.class);
+ maxHeapSize = output.addField(maxHeapSizeField, BigIntVector.class);
+ currentDirectMemory = output.addField(currentDirectMemoryField, BigIntVector.class);
+ maxDirectMemory = output.addField(maxDirectMemoryField, BigIntVector.class);
+ }
+
+ @Override
+ public void setRecordValues(final FragmentContext context) {
+ final DrillbitContext drillbitContext = context.getDrillbitContext();
+ final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+
+ final CoordinationProtos.DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
+ final String address = endpoint.getAddress();
+ final VarCharVector.Mutator hostNameMutator = hostName.getMutator();
+ hostNameMutator.setSafe(0, address.getBytes());
+ hostNameMutator.setValueCount(1);
+
+ final int port = endpoint.getUserPort();
+ final BigIntVector.Mutator userPortMutator = userPort.getMutator();
+ userPortMutator.setSafe(0, port);
+ userPortMutator.setValueCount(1);
+
+ final BigIntVector.Mutator currentHeapSizeMutator = currentHeapSize.getMutator();
+ currentHeapSizeMutator.setSafe(0, memoryMXBean.getHeapMemoryUsage().getUsed());
+ currentHeapSizeMutator.setValueCount(1);
+
+ final BigIntVector.Mutator maxHeapSizeMutator = maxHeapSize.getMutator();
+ maxHeapSizeMutator.setSafe(0, memoryMXBean.getHeapMemoryUsage().getMax());
+ maxHeapSizeMutator.setValueCount(1);
+
+ final BigIntVector.Mutator currentDirectMemoryMutator = currentDirectMemory.getMutator();
+ currentDirectMemoryMutator.setSafe(0, drillbitContext.getAllocator().getAllocatedMemory());
+ currentDirectMemoryMutator.setValueCount(1);
+
+ final BigIntVector.Mutator maxDirectMemoryMutator = maxDirectMemory.getMutator();
+ maxDirectMemoryMutator.setSafe(0, TopLevelAllocator.MAXIMUM_DIRECT_MEMORY);
+ maxDirectMemoryMutator.setValueCount(1);
+ }
+
+ @Override
+ public List<SqlTypeName> getFieldSqlTypeNames() {
+ return FIELDS;
+ }
+
+ @Override
+ public List<String> getFieldNames() {
+ return NAMES;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
index c1e8dd1..d9374cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
@@ -18,24 +18,27 @@
package org.apache.drill.exec.store.sys;
import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.RecordDataType;
import org.apache.drill.exec.store.StoragePlugin;
-import org.apache.drill.exec.store.pojo.PojoDataType;
import org.eigenbase.reltype.RelDataType;
import org.eigenbase.reltype.RelDataTypeFactory;
-public class StaticDrillTable extends DrillTable{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StaticDrillTable.class);
+/**
+ * A {@link org.apache.drill.exec.planner.logical.DrillTable} with a defined schema
+ * Currently, this is a wrapper class for {@link org.apache.drill.exec.store.sys.SystemTable}.
+ */
+public class StaticDrillTable extends DrillTable {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StaticDrillTable.class);
- private final PojoDataType type;
+ private final RecordDataType dataType;
- public StaticDrillTable(PojoDataType type, String storageEngineName, StoragePlugin plugin, Object selection) {
+ public StaticDrillTable(String storageEngineName, StoragePlugin plugin, Object selection, RecordDataType dataType) {
super(storageEngineName, plugin, selection);
- this.type = type;
+ this.dataType = dataType;
}
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return type.getRowType(typeFactory);
+ return dataType.getRowType(typeFactory);
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java
new file mode 100644
index 0000000..5bdb9b1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.RecordDataType;
+
+/**
+ * A system record holds system information (e.g. memory usage).
+ * Currently, there is only one such system record per Drillbit.
+ */
+public abstract class SystemRecord extends RecordDataType {
+
+ /**
+ * Setup value vectors to hold system information
+ * @param output the mutator from {@link org.apache.drill.exec.store.sys.SystemRecordReader}
+ * @throws SchemaChangeException
+ */
+ public abstract void setup(OutputMutator output) throws SchemaChangeException;
+
+ /**
+ * Set the values of value vectors when requested
+ * @param context the context from {@link org.apache.drill.exec.store.sys.SystemRecordReader}
+ */
+ public abstract void setRecordValues(FragmentContext context);
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java
new file mode 100644
index 0000000..9f8d0d9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+
+/**
+ * A record reader to populate a {@link SystemRecord}.
+ */
+public class SystemRecordReader extends AbstractRecordReader {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemRecordReader.class);
+
+ private final FragmentContext fragmentContext;
+ private final SystemRecord record;
+ private boolean read;
+
+ private OperatorContext operatorContext;
+
+ public SystemRecordReader(FragmentContext context, SystemRecord record) {
+ this.fragmentContext = context;
+ this.record = record;
+ this.read = false;
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+ try {
+ record.setup(output);
+ } catch (SchemaChangeException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public void setOperatorContext(OperatorContext operatorContext) {
+ this.operatorContext = operatorContext;
+ }
+
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public int next() {
+ // send only one record
+ if (!read) {
+ record.setRecordValues(fragmentContext);
+ read = true;
+ return 1;
+ }
+ return 0;
+ }
+
+ @Override
+ public void cleanup() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
index 0bf2156..2c338ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -17,40 +17,93 @@
*/
package org.apache.drill.exec.store.sys;
+import com.google.common.collect.Iterators;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.server.options.DrillConfigIterator;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.store.RecordDataType;
import org.apache.drill.exec.store.pojo.PojoDataType;
-import org.eigenbase.reltype.RelDataType;
-import org.eigenbase.reltype.RelDataTypeFactory;
+import java.util.Iterator;
+
+/**
+ * An enumeration of all system tables that Drill supports.
+ * <p/>
+ * OPTION, DRILLBITS and VERSION are local tables available on every Drillbit.
+ * MEMORY and THREADS are distributed tables with one record on every Drillbit.
+ */
public enum SystemTable {
- OPTION("options", OptionValue.class),
- DRILLBITS("drillbits", DrillbitIterator.DrillbitInstance.class),
- VERSION("version", VersionIterator.VersionInfo.class)
- ;
- private final PojoDataType type;
+ OPTION("options", false, new PojoDataType(OptionValue.class)) {
+ @Override
+ public Iterator<Object> getLocalIterator(final FragmentContext context) {
+ final DrillConfigIterator configOptions = new DrillConfigIterator(context.getConfig());
+ final OptionManager fragmentOptions = context.getOptions();
+ return (Iterator<Object>) (Object) Iterators.concat(configOptions.iterator(), fragmentOptions.iterator());
+ }
+ },
+
+ DRILLBITS("drillbits", false, new PojoDataType(DrillbitIterator.DrillbitInstance.class)) {
+ @Override
+ public Iterator<Object> getLocalIterator(final FragmentContext context) {
+ return new DrillbitIterator(context);
+ }
+ },
+
+ VERSION("version", false, new PojoDataType(VersionIterator.VersionInfo.class)) {
+ @Override
+ public Iterator<Object> getLocalIterator(final FragmentContext context) {
+ return new VersionIterator();
+ }
+ },
+
+ MEMORY("memory", true, MemoryRecord.getInstance()) {
+ @Override
+ public SystemRecord getSystemRecord() {
+ return MemoryRecord.getInstance();
+ }
+ },
+
+ THREADS("threads", true, ThreadsRecord.getInstance()) {
+ @Override
+ public SystemRecord getSystemRecord() {
+ return ThreadsRecord.getInstance();
+ }
+ };
+
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTable.class);
+
private final String tableName;
- private final Class<?> pojoClass;
+ private final boolean distributed;
+ private final RecordDataType dataType;
- SystemTable(String tableName, Class<?> clazz){
- this.type = new PojoDataType(clazz);
+ SystemTable(String tableName, boolean distributed, RecordDataType dataType) {
this.tableName = tableName;
- this.pojoClass = clazz;
+ this.distributed = distributed;
+ this.dataType = dataType;
}
- public String getTableName(){
- return tableName;
+ // Distributed tables must override this method
+ public SystemRecord getSystemRecord() {
+ throw new UnsupportedOperationException("Local table does not support this function.");
}
- public RelDataType getRowType(RelDataTypeFactory f){
- return type.getRowType(f);
+ // Local tables must override this method
+ public Iterator<Object> getLocalIterator(FragmentContext context) {
+ throw new UnsupportedOperationException("Distributed table does not support this function.");
}
- public PojoDataType getType(){
- return type;
+ public String getTableName() {
+ return tableName;
}
- public Class<?> getPojoClass(){
- return pojoClass;
+ public boolean isDistributed() {
+ return distributed;
}
-}
\ No newline at end of file
+
+ public RecordDataType getDataType() {
+ return dataType;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
index a1bec1e..0152b63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -27,18 +27,33 @@ import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.pojo.PojoDataType;
import org.apache.drill.exec.store.pojo.PojoRecordReader;
-public class SystemTableBatchCreator implements BatchCreator<SystemTableScan>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableBatchCreator.class);
+/**
+ * This class creates batches based on the the type of {@link org.apache.drill.exec.store.sys.SystemTable}.
+ * The distributed tables and the local tables use different record readers.
+ * Local system tables do not require a full-fledged query because these records are present on every Drillbit.
+ */
+public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableBatchCreator.class);
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
- public RecordBatch getBatch(FragmentContext context, SystemTableScan scan, List<RecordBatch> children)
- throws ExecutionSetupException {
- Iterator<Object> iter = scan.getPlugin().getRecordIterator(context, scan.getTable());
- PojoRecordReader reader = new PojoRecordReader(scan.getTable().getPojoClass(), iter);
+ public RecordBatch getBatch(final FragmentContext context, final SystemTableScan scan,
+ final List<RecordBatch> children)
+ throws ExecutionSetupException {
+ final SystemTable table = scan.getTable();
+ final RecordReader reader;
+ if (table.isDistributed()) {
+ final SystemRecord record = table.getSystemRecord();
+ reader = new SystemRecordReader(context, record);
+ } else {
+ final Iterator<Object> iter = table.getLocalIterator(context);
+ final PojoDataType type = (PojoDataType) table.getDataType();
+ reader = new PojoRecordReader(type.getPojoClass(), iter);
+ }
- return new ScanBatch(scan, context, Collections.singleton( (RecordReader) reader).iterator());
+ return new ScanBatch(scan, context, Collections.singleton(reader).iterator());
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index 2c70fd4..13e0ff6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -18,45 +18,51 @@
package org.apache.drill.exec.store.sys;
import java.io.IOException;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import net.hydromatic.optiq.SchemaPlus;
-
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.DrillConfigIterator;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.AbstractStoragePlugin;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
+/**
+ * A "storage" plugin for system tables.
+ */
+public class SystemTablePlugin extends AbstractStoragePlugin {
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePlugin.class);
-public class SystemTablePlugin extends AbstractStoragePlugin{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePlugin.class);
+ public static final String SYS_SCHEMA_NAME = "sys";
private final DrillbitContext context;
private final String name;
+ private final SystemTablePluginConfig config;
+ private final SystemSchema schema = new SystemSchema();
- public SystemTablePlugin(SystemTablePluginConfig configuration, DrillbitContext context, String name){
+ public SystemTablePlugin(SystemTablePluginConfig config, DrillbitContext context, String name) {
+ this.config = config;
this.context = context;
this.name = name;
}
- private SystemSchema schema = new SystemSchema();
-
@Override
public StoragePluginConfig getConfig() {
- return SystemTablePluginConfig.INSTANCE;
+ return config;
+ }
+
+ @JsonIgnore
+ public DrillbitContext getContext() {
+ return this.context;
}
@Override
@@ -64,35 +70,23 @@ public class SystemTablePlugin extends AbstractStoragePlugin{
parent.add(schema.getName(), schema);
}
- public Iterator<Object> getRecordIterator(FragmentContext context, SystemTable table){
- switch(table){
- case VERSION:
- return new VersionIterator();
- case DRILLBITS:
- return new DrillbitIterator(context);
- case OPTION:
- return Iterables.concat((Iterable<Object>)(Object) new DrillConfigIterator(context.getConfig()), //
- context.getOptions()).iterator();
- default:
- throw new UnsupportedOperationException("Unable to create record iterator for table: " + table.getTableName());
- }
- }
-
-
@Override
public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
SystemTable table = selection.getWith(context.getConfig(), SystemTable.class);
return new SystemTableScan(table, this);
}
- private class SystemSchema extends AbstractSchema{
+ /**
+ * This class defines a namespace for {@link org.apache.drill.exec.store.sys.SystemTable}s
+ */
+ private class SystemSchema extends AbstractSchema {
- private Set<String> tableNames;
+ private final Set<String> tableNames;
public SystemSchema() {
- super(ImmutableList.<String>of(), "sys");
+ super(ImmutableList.<String>of(), SYS_SCHEMA_NAME);
Set<String> names = Sets.newHashSet();
- for(SystemTable t : SystemTable.values()){
+ for (SystemTable t : SystemTable.values()) {
names.add(t.getTableName());
}
this.tableNames = ImmutableSet.copyOf(names);
@@ -103,16 +97,14 @@ public class SystemTablePlugin extends AbstractStoragePlugin{
return tableNames;
}
-
@Override
public DrillTable getTable(String name) {
- for(SystemTable table : SystemTable.values()){
- if(table.getTableName().equalsIgnoreCase(name)){
- return new StaticDrillTable(table.getType(), SystemTablePlugin.this.name, SystemTablePlugin.this, table);
+ for (SystemTable table : SystemTable.values()) {
+ if (table.getTableName().equalsIgnoreCase(name)) {
+ return new StaticDrillTable(SystemTablePlugin.this.name, SystemTablePlugin.this, table, table.getDataType());
}
}
return null;
-
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
index 93fe68e..b3348a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
@@ -19,15 +19,17 @@ package org.apache.drill.exec.store.sys;
import org.apache.drill.common.logical.StoragePluginConfig;
+/**
+ * A namesake plugin configuration for system tables.
+ */
public class SystemTablePluginConfig extends StoragePluginConfig {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePluginConfig.class);
-
- public static String NAME = "system-tables";
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePluginConfig.class);
- public static SystemTablePluginConfig INSTANCE = new SystemTablePluginConfig();
+ public static final String NAME = "system-tables";
- private SystemTablePluginConfig(){
+ public static final SystemTablePluginConfig INSTANCE = new SystemTablePluginConfig();
+ private SystemTablePluginConfig() {
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index cdd0d18..ce029ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
@@ -37,12 +38,13 @@ import org.apache.drill.exec.store.StoragePluginRegistry;
import parquet.org.codehaus.jackson.annotate.JsonCreator;
import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("sys")
-public class SystemTableScan extends AbstractGroupScan implements SubScan{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableScan.class);
+public class SystemTableScan extends AbstractGroupScan implements SubScan {
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableScan.class);
private final SystemTable table;
private final SystemTablePlugin plugin;
@@ -51,17 +53,22 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{
public SystemTableScan( //
@JsonProperty("table") SystemTable table, //
@JacksonInject StoragePluginRegistry engineRegistry //
- ) throws IOException, ExecutionSetupException {
+ ) throws IOException, ExecutionSetupException {
this.table = table;
this.plugin = (SystemTablePlugin) engineRegistry.getPlugin(SystemTablePluginConfig.INSTANCE);
}
- public SystemTableScan(SystemTable table, SystemTablePlugin plugin){
+ public SystemTableScan(SystemTable table, SystemTablePlugin plugin) {
this.table = table;
this.plugin = plugin;
}
- public ScanStats getScanStats(){
+ /**
+ * System tables do not need stats.
+ * @return a trivial stats table
+ */
+ @Override
+ public ScanStats getScanStats() {
return ScanStats.TRIVIAL_TABLE;
}
@@ -79,9 +86,22 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{
return this;
}
+ // If distributed, the scan needs to happen on every node.
@Override
public int getMaxParallelizationWidth() {
- return 1;
+ return table.isDistributed() ? plugin.getContext().getBits().size() : 1;
+ }
+
+ // If distributed, the scan needs to happen on every node.
+ @Override
+ public int getMinParallelizationWidth() {
+ return table.isDistributed() ? plugin.getContext().getBits().size() : 1;
+ }
+
+ // This enforces maximum parallelization width.
+ @Override
+ public boolean enforceWidth() {
+ return true;
}
@Override
@@ -96,7 +116,8 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{
@Override
public String getDigest() {
- return "SystemTableScan: " + table.name();
+ return "SystemTableScan [table=" + table.name() +
+ ", distributed=" + table.isDistributed() + "]";
}
@Override
@@ -104,6 +125,25 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{
return CoreOperatorType.SYSTEM_TABLE_SCAN_VALUE;
}
+ /**
+ * If distributed, the scan needs to happen on every node. Since width is enforced, the number of fragments equals
+ * number of Drillbits. And here we set, endpoint affinities to Double.POSITIVE_INFINITY to ensure every
+ * Drillbit executes a fragment.
+ * @return the Drillbit endpoint affinities
+ */
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ if (table.isDistributed()) {
+ final List<EndpointAffinity> affinities = Lists.newArrayList();
+ for (final DrillbitEndpoint endpoint : plugin.getContext().getBits()) {
+ affinities.add(new EndpointAffinity(endpoint, Double.POSITIVE_INFINITY));
+ }
+ return affinities;
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
@Override
public GroupScan clone(List<SchemaPath> columns) {
return this;
@@ -113,9 +153,9 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{
return table;
}
+ @JsonIgnore
public SystemTablePlugin getPlugin() {
return plugin;
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java
new file mode 100644
index 0000000..b184880
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.List;
+
+/**
+ * A {@link org.apache.drill.exec.store.sys.SystemRecord} that holds information about drillbit threads
+ */
+public class ThreadsRecord extends SystemRecord {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ThreadsRecord.class);
+
+ private static final ThreadsRecord INSTANCE = new ThreadsRecord();
+
+ public static SystemRecord getInstance() {
+ return INSTANCE;
+ }
+
+ private static final String HOST_NAME = "hostname";
+ private static final MaterializedField hostNameField = MaterializedField.create(HOST_NAME,
+ Types.required(TypeProtos.MinorType.VARCHAR));
+ private VarCharVector hostName;
+
+ private static final String USER_PORT = "user_port";
+ private static final MaterializedField userPortField = MaterializedField.create(USER_PORT,
+ Types.required(TypeProtos.MinorType.BIGINT));
+ private BigIntVector userPort;
+
+ private static final String TOTAL_THREADS = "total_threads";
+ private static final MaterializedField totalThreadsField = MaterializedField.create(TOTAL_THREADS,
+ Types.required(TypeProtos.MinorType.BIGINT));
+ private BigIntVector totalThreads;
+
+ private static final String BUSY_THREADS = "busy_threads";
+ private static final MaterializedField busyThreadsField = MaterializedField.create(BUSY_THREADS,
+ Types.required(TypeProtos.MinorType.BIGINT));
+ private BigIntVector busyThreads;
+
+ private static final List<SqlTypeName> FIELDS = Lists.newArrayList(SqlTypeName.VARCHAR, SqlTypeName.BIGINT,
+ SqlTypeName.BIGINT, SqlTypeName.BIGINT);
+
+ private static final List<String> NAMES = Lists.newArrayList(HOST_NAME, USER_PORT, TOTAL_THREADS, BUSY_THREADS);
+
+ private ThreadsRecord() {
+ }
+
+ @Override
+ public void setup(final OutputMutator output) throws SchemaChangeException {
+ hostName = output.addField(hostNameField, VarCharVector.class);
+ userPort = output.addField(userPortField, BigIntVector.class);
+ totalThreads = output.addField(totalThreadsField, BigIntVector.class);
+ busyThreads = output.addField(busyThreadsField, BigIntVector.class);
+ }
+
+ @Override
+ public void setRecordValues(final FragmentContext context) {
+ final DrillbitContext drillbitContext = context.getDrillbitContext();
+ final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+
+ final CoordinationProtos.DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
+ final String address = endpoint.getAddress();
+ final VarCharVector.Mutator hostNameMutator = hostName.getMutator();
+ hostNameMutator.setSafe(0, address.getBytes());
+ hostNameMutator.setValueCount(1);
+
+ final int port = endpoint.getUserPort();
+ final BigIntVector.Mutator userPortMutator = userPort.getMutator();
+ userPortMutator.setSafe(0, port);
+ userPortMutator.setValueCount(1);
+
+ final BigIntVector.Mutator totalThreadsMutator = totalThreads.getMutator();
+ totalThreadsMutator.setSafe(0, threadMXBean.getPeakThreadCount());
+ totalThreadsMutator.setValueCount(1);
+
+ final BigIntVector.Mutator busyThreadsMutator = busyThreads.getMutator();
+ busyThreadsMutator.setSafe(0, threadMXBean.getThreadCount());
+ busyThreadsMutator.setValueCount(1);
+ }
+
+ @Override
+ public List<SqlTypeName> getFieldSqlTypeNames() {
+ return FIELDS;
+ }
+
+ @Override
+ public List<String> getFieldNames() {
+ return NAMES;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8ab361b1/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
index c1803bc..4f4d29b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
@@ -44,4 +44,16 @@ public class TestSystemTable extends BaseTestQuery {
.baselineValues(true)
.go();
}
+
+ // need to enhance this
+ @Test
+ public void testThreadsTable() throws Exception {
+ test("select * from sys.threads");
+ }
+
+ // need to enhance this
+ @Test
+ public void testMemoryTable() throws Exception {
+ test("select * from sys.memory");
+ }
}
[2/5] drill git commit: DRILL-2323: Added parquet metadata to logs
(+filename to JSON reader)
Posted by ve...@apache.org.
DRILL-2323: Added parquet metadata to logs (+filename to JSON reader)
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c5ec7806
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c5ec7806
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c5ec7806
Branch: refs/heads/master
Commit: c5ec7806384de9acec61ff2ef133c0416b7a2cdb
Parents: f447a9d
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Tue Mar 3 11:32:57 2015 -0800
Committer: Sudheesh Katkam <sk...@maprtech.com>
Committed: Wed Mar 25 18:16:30 2015 -0700
----------------------------------------------------------------------
.../exec/store/easy/json/JSONRecordReader.java | 4 +--
.../store/parquet/ParquetScanBatchCreator.java | 2 +-
.../columnreaders/ParquetRecordReader.java | 24 ++++++++++----
.../exec/store/parquet2/DrillParquetReader.java | 14 ++++++--
.../parquet/hadoop/ColumnChunkIncReadStore.java | 35 +++++++++++++++-----
5 files changed, 59 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 6fbdf4f..cc7cb83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
public class JSONRecordReader extends AbstractRecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
private OutputMutator mutator;
private VectorContainerWriter writer;
@@ -92,7 +92,7 @@ public class JSONRecordReader extends AbstractRecordReader {
protected void handleAndRaise(String msg, Exception e) {
StringBuilder sb = new StringBuilder();
- sb.append(msg).append(" - Parser was at record: ").append(recordCount+1);
+ sb.append(msg).append(" - In ").append(hadoopPath.toUri().getPath()).append(" parser was at record: ").append(recordCount+1);
if (e instanceof JsonParseException) {
JsonParseException ex = JsonParseException.class.cast(e);
sb.append(" column: ").append(ex.getLocation().getColumnNr());
http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index ad1bf32..3ae2b36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -54,7 +54,7 @@ import com.google.common.collect.Lists;
public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";
http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 5b9212c..11d0042 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -60,7 +60,7 @@ import parquet.schema.PrimitiveType;
import com.google.common.collect.Lists;
public class ParquetRecordReader extends AbstractRecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
// this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
private static final int NUMBER_OF_VECTORS = 1;
@@ -319,13 +319,17 @@ public class ParquetRecordReader extends AbstractRecordReader {
}
}
}
- } catch (SchemaChangeException e) {
- throw new ExecutionSetupException(e);
} catch (Exception e) {
- throw new ExecutionSetupException(e);
+ handleAndRaise("Failure in setting up reader", e);
}
}
+ protected void handleAndRaise(String s, Exception e) {
+ String message = "Error in parquet record reader.\nMessage: " + s +
+ "\nParquet Metadata: " + footer;
+ throw new DrillRuntimeException(message, e);
+ }
+
@Override
public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
try {
@@ -424,9 +428,17 @@ public class ParquetRecordReader extends AbstractRecordReader {
// logger.debug("So far read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass();
return firstColumnStatus.getRecordsReadInCurrentPass();
- } catch (IOException e) {
- throw new DrillRuntimeException(e);
+ } catch (Exception e) {
+ handleAndRaise("\nHadoop path: " + hadoopPath.toUri().getPath() +
+ "\nTotal records read: " + totalRecordsRead +
+ "\nMock records read: " + mockRecordsRead +
+ "\nRecords to read: " + recordsToRead +
+ "\nRow group index: " + rowGroupIndex +
+ "\nRecords in row group: " + footer.getBlocks().get(rowGroupIndex).getRowCount(), e);
}
+
+ // this is never reached
+ return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 8778ef8..9d85b67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Set;
import com.google.common.collect.Sets;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
@@ -75,7 +76,7 @@ import parquet.schema.Types;
public class DrillParquetReader extends AbstractRecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
// same as the DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH in ParquetRecordReader
@@ -270,10 +271,16 @@ public class DrillParquetReader extends AbstractRecordReader {
recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer);
}
} catch (Exception e) {
- throw new ExecutionSetupException(e);
+ handleAndRaise("Failure in setting up reader", e);
}
}
+ protected void handleAndRaise(String s, Exception e) {
+ String message = "Error in drill parquet reader (complex).\nMessage: " + s +
+ "\nParquet Metadata: " + footer;
+ throw new DrillRuntimeException(message, e);
+ }
+
private static Type getType(String[] pathSegments, int depth, MessageType schema) {
Type type = schema.getType(Arrays.copyOfRange(pathSegments, 0, depth + 1));
if (depth + 1 == pathSegments.length) {
@@ -312,7 +319,8 @@ public class DrillParquetReader extends AbstractRecordReader {
if (count % fillLevelCheckFrequency == 0) {
if (getPercentFilled() > fillLevelCheckThreshold) {
if(!recordMaterializer.ok()){
- throw new RuntimeException(String.format("The setting for `%s` is too high for your Parquet records. Please set a lower check threshold and retry your query. ", ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD));
+ String message = String.format("The setting for `%s` is too high for your Parquet records. Please set a lower check threshold and retry your query. ", ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD);
+ handleAndRaise(message, new RuntimeException(message));
}
break;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
index f2fe376..1125435 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -92,9 +93,11 @@ public class ColumnChunkIncReadStore implements PageReadStore {
@Override
public DictionaryPage readDictionaryPage() {
if (dictionaryPage == null) {
+ PageHeader pageHeader = new PageHeader();
+ long pos = 0;
try {
- long pos = in.getPos();
- PageHeader pageHeader = Util.readPageHeader(in);
+ pos = in.getPos();
+ pageHeader = Util.readPageHeader(in);
if (pageHeader.getDictionary_page_header() == null) {
in.seek(pos);
return null;
@@ -105,8 +108,16 @@ public class ColumnChunkIncReadStore implements PageReadStore {
pageHeader.getDictionary_page_header().getNum_values(),
parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new DrillRuntimeException("Error reading dictionary page." +
+ "\nFile path: " + path.toUri().getPath() +
+ "\nRow count: " + rowCount +
+ "\nColumn Chunk Metadata: " + metaData +
+ "\nPage Header: " + pageHeader +
+ "\nFile offset: " + fileOffset +
+ "\nSize: " + size +
+ "\nValue read so far: " + valueReadSoFar +
+ "\nPosition: " + pos, e);
}
}
return dictionaryPage;
@@ -119,13 +130,14 @@ public class ColumnChunkIncReadStore implements PageReadStore {
@Override
public Page readPage() {
+ PageHeader pageHeader = new PageHeader();
try {
if (lastPage != null) {
lastPage.release();
lastPage = null;
}
while(valueReadSoFar < metaData.getValueCount()) {
- PageHeader pageHeader = Util.readPageHeader(in);
+ pageHeader = Util.readPageHeader(in);
switch (pageHeader.type) {
case DICTIONARY_PAGE:
if (dictionaryPage == null) {
@@ -151,7 +163,7 @@ public class ColumnChunkIncReadStore implements PageReadStore {
decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
pageHeader.data_page_header.num_values,
pageHeader.uncompressed_page_size,
- parquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
+ ParquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
@@ -163,8 +175,15 @@ public class ColumnChunkIncReadStore implements PageReadStore {
}
in.close();
return null;
- } catch (IOException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new DrillRuntimeException("Error reading page." +
+ "\nFile path: " + path.toUri().getPath() +
+ "\nRow count: " + rowCount +
+ "\nColumn Chunk Metadata: " + metaData +
+ "\nPage Header: " + pageHeader +
+ "\nFile offset: " + fileOffset +
+ "\nSize: " + size +
+ "\nValue read so far: " + valueReadSoFar, e);
}
}
[5/5] drill git commit: DRILL-2322: Adding file split information to
logs
Posted by ve...@apache.org.
DRILL-2322: Adding file split information to logs
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f447a9d5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f447a9d5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f447a9d5
Branch: refs/heads/master
Commit: f447a9d53e96831e4adbc3316bfd224954381fff
Parents: 8ab361b
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Mon Mar 2 09:42:48 2015 -0800
Committer: Sudheesh Katkam <sk...@maprtech.com>
Committed: Wed Mar 25 18:16:30 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/ops/FragmentContext.java | 4 ++-
.../exec/store/text/DrillTextRecordReader.java | 30 ++++++++++++++------
2 files changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/f447a9d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 5e31e5c..a4ac724 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -128,7 +128,9 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
}
public void fail(Throwable cause) {
- logger.error("Fragment Context received failure.", cause);
+ final FragmentHandle fragmentHandle = fragment.getHandle();
+ logger.error("Fragment Context received failure -- Fragment: {}:{}",
+ fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(), cause);
setState(FragmentContextState.FAILED);
deferredException.addThrowable(cause);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f447a9d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 7c1f888..1ad053d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.store.text;
import java.io.IOException;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -33,7 +32,6 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
@@ -68,10 +66,13 @@ public class DrillTextRecordReader extends AbstractRecordReader {
private LongWritable key;
private Text value;
private int numCols = 0;
+ private FileSplit split;
+ private long totalRecordsRead;
public DrillTextRecordReader(FileSplit split, FragmentContext context, char delimiter, List<SchemaPath> columns) {
this.fragmentContext = context;
this.delimiter = (byte) delimiter;
+ this.split = split;
setColumns(columns);
if (!isStarQuery()) {
@@ -101,8 +102,9 @@ public class DrillTextRecordReader extends AbstractRecordReader {
reader = inputFormat.getRecordReader(split, job, Reporter.NULL);
key = reader.createKey();
value = reader.createValue();
- } catch (IOException e) {
- throw new RuntimeException(e);
+ totalRecordsRead = 0;
+ } catch (Exception e) {
+ handleAndRaise("Failure in creating record reader", e);
}
}
@@ -130,11 +132,19 @@ public class DrillTextRecordReader extends AbstractRecordReader {
MaterializedField field = MaterializedField.create(ref, Types.repeated(TypeProtos.MinorType.VARCHAR));
try {
vector = output.addField(field, RepeatedVarCharVector.class);
- } catch (SchemaChangeException e) {
- throw new ExecutionSetupException(e);
+ } catch (Exception e) {
+ handleAndRaise("Failure in setting up reader", e);
}
}
+ protected void handleAndRaise(String s, Exception e) {
+ String message = "Error in text record reader.\nMessage: " + s +
+ "\nSplit information:\n\tPath: " + split.getPath() +
+ "\n\tStart: " + split.getStart() +
+ "\n\tLength: " + split.getLength();
+ throw new DrillRuntimeException(message, e);
+ }
+
@Override
public int next() {
// logger.debug("vector value capacity {}", vector.getValueCapacity());
@@ -173,6 +183,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
batchSize += end - start;
}
recordCount++;
+ totalRecordsRead++;
}
for (ValueVector v : vectors) {
v.getMutator().setValueCount(recordCount);
@@ -180,10 +191,13 @@ public class DrillTextRecordReader extends AbstractRecordReader {
vector.getMutator().setValueCount(recordCount);
logger.debug("text scan batch size {}", batchSize);
return recordCount;
- } catch (IOException e) {
+ } catch(Exception e) {
cleanup();
- throw new DrillRuntimeException(e);
+ handleAndRaise("Failure while parsing text. Parser was at record: " + (totalRecordsRead + 1), e);
}
+
+ // this is never reached
+ return 0;
}
/**
[4/5] drill git commit: DRILL-2572: Use PrelUtil to get
PlannerSettings for PruneScanRule.
Posted by ve...@apache.org.
DRILL-2572: Use PrelUtil to get PlannerSettings for PruneScanRule.
+ context.getPlannerSettings() returns null sometimes
+ introduced in commit 48c9c01
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/50ad974e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/50ad974e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/50ad974e
Branch: refs/heads/master
Commit: 50ad974e9a5c960f6027fb61d39e537c87fd7f14
Parents: 06d7eb2
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Wed Mar 25 17:18:57 2015 -0700
Committer: Sudheesh Katkam <sk...@maprtech.com>
Committed: Wed Mar 25 18:16:30 2015 -0700
----------------------------------------------------------------------
.../drill/exec/planner/logical/partition/PruneScanRule.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/50ad974e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index b8c9ebf..413259d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -45,6 +45,7 @@ import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -119,7 +120,7 @@ public abstract class PruneScanRule extends RelOptRule {
}
protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
- PlannerSettings settings = context.getPlannerSettings();
+ final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
FileSystemPartitionDescriptor descriptor = new FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel());
final BufferAllocator allocator = context.getAllocator();
[3/5] drill git commit: DRILL-2367: Removed partition column label in
conf (gets from options)
Posted by ve...@apache.org.
DRILL-2367: Removed partition column label in conf (gets from options)
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/06d7eb28
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/06d7eb28
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/06d7eb28
Branch: refs/heads/master
Commit: 06d7eb28e75d72ccb7d1c755a43b891212353221
Parents: c5ec780
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Wed Mar 18 16:48:36 2015 -0700
Committer: Sudheesh Katkam <sk...@maprtech.com>
Committed: Wed Mar 25 18:16:30 2015 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/drill/exec/ExecConstants.java | 8 ++++++++
.../java/org/apache/drill/exec/physical/impl/ScanBatch.java | 9 +++++++--
.../apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java | 3 ++-
.../drill/exec/store/parquet/ParquetScanBatchCreator.java | 3 ++-
exec/java-exec/src/main/resources/drill-module.conf | 3 +--
5 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/06d7eb28/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index cd0a0a2..14e6ad1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -105,8 +105,16 @@ public interface ExecConstants {
public static String JSON_ALL_TEXT_MODE = "store.json.all_text_mode";
public static OptionValidator JSON_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(JSON_ALL_TEXT_MODE, false);
+ /**
+ * The column label (for directory levels) in results when querying files in a directory
+ * E.g. labels: dir0 dir1
+ * structure: foo
+ * |- bar - a.parquet
+ * |- baz - b.parquet
+ */
public static final String FILESYSTEM_PARTITION_COLUMN_LABEL = "drill.exec.storage.file.partition.column.label";
public static final OptionValidator FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR = new StringValidator(FILESYSTEM_PARTITION_COLUMN_LABEL, "dir");
+
public static String MONGO_ALL_TEXT_MODE = "store.mongo.all_text_mode";
public static OptionValidator MONGO_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(MONGO_ALL_TEXT_MODE, false);
http://git-wip-us.apache.org/repos/asf/drill/blob/06d7eb28/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 6b7294d..ca2a048 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -46,6 +46,7 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableVarCharVector;
@@ -103,8 +104,12 @@ public class ScanBatch implements RecordBatch {
this.partitionColumns = partitionColumns.iterator();
this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
this.selectedPartitionColumns = selectedPartitionColumns;
- DrillConfig config = context.getConfig();
- this.partitionColumnDesignator = config == null ? "dir" : config.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+
+ // TODO Remove null check after DRILL-2097 is resolved. That JIRA refers to test cases that do not initialize
+ // options; so labelValue = null.
+ final OptionValue labelValue = context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+ this.partitionColumnDesignator = labelValue == null ? "dir" : labelValue.string_val;
+
addPartitionVectors();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/06d7eb28/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 431b362..6e1e0cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -118,7 +118,8 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
List<SchemaPath> columns) throws ExecutionSetupException;
RecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
- String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+ String partitionDesignator = context.getOptions()
+ .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
List<SchemaPath> columns = scan.getColumns();
List<RecordReader> readers = Lists.newArrayList();
List<String[]> partitionColumns = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/drill/blob/06d7eb28/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 3ae2b36..c1f815e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -63,7 +63,8 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
@Override
public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
- String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+ String partitionDesignator = context.getOptions()
+ .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
List<SchemaPath> columns = rowGroupScan.getColumns();
List<RecordReader> readers = Lists.newArrayList();
OperatorContext oContext = new OperatorContext(rowGroupScan, context,
http://git-wip-us.apache.org/repos/asf/drill/blob/06d7eb28/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 9e807b0..af225c4 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -60,8 +60,7 @@ drill.exec: {
text: {
buffer.size: 262144,
batch.size: 4000
- },
- partition.column.label: "dir"
+ }
}
},
metrics : {