You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by me...@apache.org on 2015/04/14 22:54:01 UTC
[1/2] drill git commit: DRILL-2781: Protobuf changes for nested loop
join
Repository: drill
Updated Branches:
refs/heads/master 3b5a87e89 -> 7cee11c2b
DRILL-2781: Protobuf changes for nested loop join
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5441e72c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5441e72c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5441e72c
Branch: refs/heads/master
Commit: 5441e72c0d97e8ccd7c196f5a9f6f23fdc8d2b32
Parents: 3b5a87e
Author: Mehant Baid <me...@gmail.com>
Authored: Tue Apr 14 09:36:32 2015 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Tue Apr 14 09:36:32 2015 -0700
----------------------------------------------------------------------
.../main/java/org/apache/drill/exec/proto/UserBitShared.java | 6 +++---
.../org/apache/drill/exec/proto/beans/CoreOperatorType.java | 4 +++-
2 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/5441e72c/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index ac1bcbb..96a921b 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -19688,7 +19688,7 @@ public final class UserBitShared {
"ype\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003*" +
"k\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITIN",
"G_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020" +
- "\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\264\005\n\020CoreOpe" +
+ "\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\312\005\n\020CoreOpe" +
"ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS" +
"T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE" +
"\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS" +
@@ -19706,8 +19706,8 @@ public final class UserBitShared {
"\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB" +
"_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER" +
"_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WINDO" +
- "W\020\"B.\n\033org.apache.drill.exec.protoB\rUser",
- "BitSharedH\001"
+ "W\020\"\022\024\n\020NESTED_LOOP_JOIN\020#B.\n\033org.apache.",
+ "drill.exec.protoB\rUserBitSharedH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/drill/blob/5441e72c/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index 7475cdc..a37209d 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -56,7 +56,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
COMPLEX_TO_JSON(31),
PRODUCER_CONSUMER(32),
HBASE_SUB_SCAN(33),
- WINDOW(34);
+ WINDOW(34),
+ NESTED_LOOP_JOIN(35);
public final int number;
@@ -109,6 +110,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
case 32: return PRODUCER_CONSUMER;
case 33: return HBASE_SUB_SCAN;
case 34: return WINDOW;
+ case 35: return NESTED_LOOP_JOIN;
default: return null;
}
}
[2/2] drill git commit: DRILL-2766: Removed SystemRecordReader and
SystemRecords + Use PojoRecordReader with ThreadsIterator and MemoryIterator
+ Includes DRILL-2670: Added a test case since DRILL-2714 resolves this issue
Posted by me...@apache.org.
DRILL-2766: Removed SystemRecordReader and SystemRecords
+ Use PojoRecordReader with ThreadsIterator and MemoryIterator
+ Includes DRILL-2670: Added a test case since DRILL-2714 resolves this issue
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7cee11c2
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7cee11c2
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7cee11c2
Branch: refs/heads/master
Commit: 7cee11c2bf24b0dd532f1068526bf7caecc9a935
Parents: 5441e72
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Fri Apr 10 14:07:39 2015 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Tue Apr 14 10:56:26 2015 -0700
----------------------------------------------------------------------
.../drill/exec/store/sys/MemoryIterator.java | 77 ++++++++++
.../drill/exec/store/sys/MemoryRecord.java | 141 -------------------
.../drill/exec/store/sys/SystemRecord.java | 44 ------
.../exec/store/sys/SystemRecordReader.java | 77 ----------
.../drill/exec/store/sys/SystemTable.java | 46 +++---
.../exec/store/sys/SystemTableBatchCreator.java | 12 +-
.../drill/exec/store/sys/SystemTablePlugin.java | 4 +-
.../drill/exec/store/sys/ThreadsIterator.java | 70 +++++++++
.../drill/exec/store/sys/ThreadsRecord.java | 119 ----------------
.../drill/exec/store/sys/TestSystemTable.java | 14 +-
10 files changed, 180 insertions(+), 424 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
new file mode 100644
index 0000000..4ceeebe
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryUsage;
+import java.util.Iterator;
+
+public class MemoryIterator implements Iterator<Object> {
+
+ private boolean beforeFirst = true;
+ private final FragmentContext context;
+
+ public MemoryIterator(final FragmentContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return beforeFirst;
+ }
+
+ @Override
+ public Object next() {
+ if (!beforeFirst) {
+ throw new IllegalStateException();
+ }
+ beforeFirst = false;
+ final MemoryInfo memoryInfo = new MemoryInfo();
+
+ final DrillbitEndpoint endpoint = context.getIdentity();
+ memoryInfo.hostname = endpoint.getAddress();
+ memoryInfo.user_port = endpoint.getUserPort();
+
+ final MemoryUsage heapMemoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+ memoryInfo.heap_current = heapMemoryUsage.getUsed();
+ memoryInfo.heap_max = heapMemoryUsage.getMax();
+
+ memoryInfo.direct_current = context.getDrillbitContext().getAllocator().getAllocatedMemory();
+ memoryInfo.direct_max = TopLevelAllocator.MAXIMUM_DIRECT_MEMORY;
+ return memoryInfo;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static class MemoryInfo {
+ public String hostname;
+ public long user_port;
+ public long heap_current;
+ public long heap_max;
+ public long direct_current;
+ public long direct_max;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java
deleted file mode 100644
index 9cb001d..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryRecord.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import com.google.common.collect.Lists;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.VarCharVector;
-import org.eigenbase.sql.type.SqlTypeName;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.util.List;
-
-/**
- * A {@link org.apache.drill.exec.store.sys.SystemRecord} that holds information about drillbit memory
- */
-public class MemoryRecord extends SystemRecord {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryRecord.class);
-
- private static final MemoryRecord INSTANCE = new MemoryRecord();
-
- public static SystemRecord getInstance() {
- return INSTANCE;
- }
-
- private static final String HOST_NAME = "hostname";
- private static final MaterializedField hostNameField = MaterializedField.create(HOST_NAME,
- Types.required(TypeProtos.MinorType.VARCHAR));
- private VarCharVector hostName;
-
- private static final String USER_PORT = "user_port";
- private static final MaterializedField userPortField = MaterializedField.create(USER_PORT,
- Types.required(TypeProtos.MinorType.BIGINT));
- private BigIntVector userPort;
-
- private static final String CURRENT_HEAP_SIZE = "heap_current";
- private static final MaterializedField currentHeapSizeField = MaterializedField.create(CURRENT_HEAP_SIZE,
- Types.required(TypeProtos.MinorType.BIGINT));
- private BigIntVector currentHeapSize;
-
- private static final String MAX_HEAP_SIZE = "heap_max";
- private static final MaterializedField maxHeapSizeField = MaterializedField.create(MAX_HEAP_SIZE,
- Types.required(TypeProtos.MinorType.BIGINT));
- private BigIntVector maxHeapSize;
-
- private static final String CURRENT_DIRECT_MEMORY = "direct_current";
- private static final MaterializedField currentDirectMemoryField = MaterializedField.create(CURRENT_DIRECT_MEMORY,
- Types.required(TypeProtos.MinorType.BIGINT));
- private BigIntVector currentDirectMemory;
-
- private static final String MAX_DIRECT_MEMORY = "direct_max";
- private static final MaterializedField maxDirectMemoryField = MaterializedField.create(MAX_DIRECT_MEMORY,
- Types.required(TypeProtos.MinorType.BIGINT));
- private BigIntVector maxDirectMemory;
-
- private static final List<SqlTypeName> FIELDS = Lists.newArrayList(SqlTypeName.VARCHAR, SqlTypeName.BIGINT,
- SqlTypeName.BIGINT, SqlTypeName.BIGINT, SqlTypeName.BIGINT, SqlTypeName.BIGINT);
-
- private static final List<String> NAMES = Lists.newArrayList(HOST_NAME, USER_PORT, CURRENT_HEAP_SIZE, MAX_HEAP_SIZE,
- CURRENT_DIRECT_MEMORY, MAX_DIRECT_MEMORY);
-
- private MemoryRecord() {
- }
-
- @Override
- public void setup(final OutputMutator output) throws SchemaChangeException {
- hostName = output.addField(hostNameField, VarCharVector.class);
- userPort = output.addField(userPortField, BigIntVector.class);
- currentHeapSize = output.addField(currentHeapSizeField, BigIntVector.class);
- maxHeapSize = output.addField(maxHeapSizeField, BigIntVector.class);
- currentDirectMemory = output.addField(currentDirectMemoryField, BigIntVector.class);
- maxDirectMemory = output.addField(maxDirectMemoryField, BigIntVector.class);
- }
-
- @Override
- public void setRecordValues(final FragmentContext context) {
- final DrillbitContext drillbitContext = context.getDrillbitContext();
- final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
-
- final CoordinationProtos.DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
- final String address = endpoint.getAddress();
- final VarCharVector.Mutator hostNameMutator = hostName.getMutator();
- hostNameMutator.setSafe(0, address.getBytes());
- hostNameMutator.setValueCount(1);
-
- final int port = endpoint.getUserPort();
- final BigIntVector.Mutator userPortMutator = userPort.getMutator();
- userPortMutator.setSafe(0, port);
- userPortMutator.setValueCount(1);
-
- final BigIntVector.Mutator currentHeapSizeMutator = currentHeapSize.getMutator();
- currentHeapSizeMutator.setSafe(0, memoryMXBean.getHeapMemoryUsage().getUsed());
- currentHeapSizeMutator.setValueCount(1);
-
- final BigIntVector.Mutator maxHeapSizeMutator = maxHeapSize.getMutator();
- maxHeapSizeMutator.setSafe(0, memoryMXBean.getHeapMemoryUsage().getMax());
- maxHeapSizeMutator.setValueCount(1);
-
- final BigIntVector.Mutator currentDirectMemoryMutator = currentDirectMemory.getMutator();
- currentDirectMemoryMutator.setSafe(0, drillbitContext.getAllocator().getAllocatedMemory());
- currentDirectMemoryMutator.setValueCount(1);
-
- final BigIntVector.Mutator maxDirectMemoryMutator = maxDirectMemory.getMutator();
- maxDirectMemoryMutator.setSafe(0, TopLevelAllocator.MAXIMUM_DIRECT_MEMORY);
- maxDirectMemoryMutator.setValueCount(1);
- }
-
- @Override
- public List<SqlTypeName> getFieldSqlTypeNames() {
- return FIELDS;
- }
-
- @Override
- public List<String> getFieldNames() {
- return NAMES;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java
deleted file mode 100644
index 5bdb9b1..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecord.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.RecordDataType;
-
-/**
- * A system record holds system information (e.g. memory usage).
- * Currently, there is only one such system record per Drillbit.
- */
-public abstract class SystemRecord extends RecordDataType {
-
- /**
- * Setup value vectors to hold system information
- * @param output the mutator from {@link org.apache.drill.exec.store.sys.SystemRecordReader}
- * @throws SchemaChangeException
- */
- public abstract void setup(OutputMutator output) throws SchemaChangeException;
-
- /**
- * Set the values of value vectors when requested
- * @param context the context from {@link org.apache.drill.exec.store.sys.SystemRecordReader}
- */
- public abstract void setRecordValues(FragmentContext context);
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java
deleted file mode 100644
index 9f8d0d9..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemRecordReader.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
-
-/**
- * A record reader to populate a {@link SystemRecord}.
- */
-public class SystemRecordReader extends AbstractRecordReader {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemRecordReader.class);
-
- private final FragmentContext fragmentContext;
- private final SystemRecord record;
- private boolean read;
-
- private OperatorContext operatorContext;
-
- public SystemRecordReader(FragmentContext context, SystemRecord record) {
- this.fragmentContext = context;
- this.record = record;
- this.read = false;
- }
-
- @Override
- public void setup(OutputMutator output) throws ExecutionSetupException {
- try {
- record.setup(output);
- } catch (SchemaChangeException e) {
- throw new ExecutionSetupException(e);
- }
- }
-
- @Override
- public void setOperatorContext(OperatorContext operatorContext) {
- this.operatorContext = operatorContext;
- }
-
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- @Override
- public int next() {
- // send only one record
- if (!read) {
- record.setRecordValues(fragmentContext);
- read = true;
- return 1;
- }
- return 0;
- }
-
- @Override
- public void cleanup() {
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
index 2c338ca..e2ac9ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -22,8 +22,6 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.server.options.DrillConfigIterator;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValue;
-import org.apache.drill.exec.store.RecordDataType;
-import org.apache.drill.exec.store.pojo.PojoDataType;
import java.util.Iterator;
@@ -35,40 +33,40 @@ import java.util.Iterator;
*/
public enum SystemTable {
- OPTION("options", false, new PojoDataType(OptionValue.class)) {
+ OPTION("options", false, OptionValue.class) {
@Override
- public Iterator<Object> getLocalIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final FragmentContext context) {
final DrillConfigIterator configOptions = new DrillConfigIterator(context.getConfig());
final OptionManager fragmentOptions = context.getOptions();
return (Iterator<Object>) (Object) Iterators.concat(configOptions.iterator(), fragmentOptions.iterator());
}
},
- DRILLBITS("drillbits", false, new PojoDataType(DrillbitIterator.DrillbitInstance.class)) {
+ DRILLBITS("drillbits", false,DrillbitIterator.DrillbitInstance.class) {
@Override
- public Iterator<Object> getLocalIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final FragmentContext context) {
return new DrillbitIterator(context);
}
},
- VERSION("version", false, new PojoDataType(VersionIterator.VersionInfo.class)) {
+ VERSION("version", false, VersionIterator.VersionInfo.class) {
@Override
- public Iterator<Object> getLocalIterator(final FragmentContext context) {
+ public Iterator<Object> getIterator(final FragmentContext context) {
return new VersionIterator();
}
},
- MEMORY("memory", true, MemoryRecord.getInstance()) {
+ MEMORY("memory", true, MemoryIterator.MemoryInfo.class) {
@Override
- public SystemRecord getSystemRecord() {
- return MemoryRecord.getInstance();
+ public Iterator<Object> getIterator(final FragmentContext context) {
+ return new MemoryIterator(context);
}
},
- THREADS("threads", true, ThreadsRecord.getInstance()) {
+ THREADS("threads", true, ThreadsIterator.ThreadsInfo.class) {
@Override
- public SystemRecord getSystemRecord() {
- return ThreadsRecord.getInstance();
+ public Iterator<Object> getIterator(final FragmentContext context) {
+ return new ThreadsIterator(context);
}
};
@@ -76,22 +74,16 @@ public enum SystemTable {
private final String tableName;
private final boolean distributed;
- private final RecordDataType dataType;
+ private final Class<?> pojoClass;
- SystemTable(String tableName, boolean distributed, RecordDataType dataType) {
+ SystemTable(final String tableName, final boolean distributed, final Class<?> pojoClass) {
this.tableName = tableName;
this.distributed = distributed;
- this.dataType = dataType;
+ this.pojoClass = pojoClass;
}
- // Distributed tables must override this method
- public SystemRecord getSystemRecord() {
- throw new UnsupportedOperationException("Local table does not support this function.");
- }
-
- // Local tables must override this method
- public Iterator<Object> getLocalIterator(FragmentContext context) {
- throw new UnsupportedOperationException("Distributed table does not support this function.");
+ public Iterator<Object> getIterator(final FragmentContext context) {
+ throw new UnsupportedOperationException(tableName + " must override this method.");
}
public String getTableName() {
@@ -102,8 +94,8 @@ public enum SystemTable {
return distributed;
}
- public RecordDataType getDataType() {
- return dataType;
+ public Class getPojoClass() {
+ return pojoClass;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
index 0152b63..92f676a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -27,7 +27,6 @@ import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.pojo.PojoDataType;
import org.apache.drill.exec.store.pojo.PojoRecordReader;
/**
@@ -44,15 +43,8 @@ public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> {
final List<RecordBatch> children)
throws ExecutionSetupException {
final SystemTable table = scan.getTable();
- final RecordReader reader;
- if (table.isDistributed()) {
- final SystemRecord record = table.getSystemRecord();
- reader = new SystemRecordReader(context, record);
- } else {
- final Iterator<Object> iter = table.getLocalIterator(context);
- final PojoDataType type = (PojoDataType) table.getDataType();
- reader = new PojoRecordReader(type.getPojoClass(), iter);
- }
+ final Iterator<Object> iterator = table.getIterator(context);
+ final RecordReader reader = new PojoRecordReader(table.getPojoClass(), iterator);
return new ScanBatch(scan, context, Collections.singleton(reader).iterator());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index 13e0ff6..b92f98c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.pojo.PojoDataType;
/**
* A "storage" plugin for system tables.
@@ -101,7 +102,8 @@ public class SystemTablePlugin extends AbstractStoragePlugin {
public DrillTable getTable(String name) {
for (SystemTable table : SystemTable.values()) {
if (table.getTableName().equalsIgnoreCase(name)) {
- return new StaticDrillTable(SystemTablePlugin.this.name, SystemTablePlugin.this, table, table.getDataType());
+ return new StaticDrillTable(SystemTablePlugin.this.name, SystemTablePlugin.this, table,
+ new PojoDataType(table.getPojoClass()));
}
}
return null;
http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
new file mode 100644
index 0000000..e9bc7ff
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.Iterator;
+
+public class ThreadsIterator implements Iterator<Object> {
+
+ private boolean beforeFirst = true;
+ private final FragmentContext context;
+
+ public ThreadsIterator(final FragmentContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return beforeFirst;
+ }
+
+ @Override
+ public Object next() {
+ if (!beforeFirst) {
+ throw new IllegalStateException();
+ }
+ beforeFirst = false;
+ final ThreadsInfo threadsInfo = new ThreadsInfo();
+
+ final DrillbitEndpoint endpoint = context.getIdentity();
+ threadsInfo.hostname = endpoint.getAddress();
+ threadsInfo.user_port = endpoint.getUserPort();
+
+ final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+ threadsInfo.total_threads = threadMXBean.getPeakThreadCount();
+ threadsInfo.busy_threads = threadMXBean.getThreadCount();
+ return threadsInfo;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static class ThreadsInfo {
+ public String hostname;
+ public long user_port;
+ public long total_threads;
+ public long busy_threads;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java
deleted file mode 100644
index b184880..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsRecord.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import com.google.common.collect.Lists;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.VarCharVector;
-import org.eigenbase.sql.type.SqlTypeName;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
-import java.util.List;
-
-/**
- * A {@link org.apache.drill.exec.store.sys.SystemRecord} that holds information about drillbit threads
- */
-public class ThreadsRecord extends SystemRecord {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ThreadsRecord.class);
-
- private static final ThreadsRecord INSTANCE = new ThreadsRecord();
-
- public static SystemRecord getInstance() {
- return INSTANCE;
- }
-
- private static final String HOST_NAME = "hostname";
- private static final MaterializedField hostNameField = MaterializedField.create(HOST_NAME,
- Types.required(TypeProtos.MinorType.VARCHAR));
- private VarCharVector hostName;
-
- private static final String USER_PORT = "user_port";
- private static final MaterializedField userPortField = MaterializedField.create(USER_PORT,
- Types.required(TypeProtos.MinorType.BIGINT));
- private BigIntVector userPort;
-
- private static final String TOTAL_THREADS = "total_threads";
- private static final MaterializedField totalThreadsField = MaterializedField.create(TOTAL_THREADS,
- Types.required(TypeProtos.MinorType.BIGINT));
- private BigIntVector totalThreads;
-
- private static final String BUSY_THREADS = "busy_threads";
- private static final MaterializedField busyThreadsField = MaterializedField.create(BUSY_THREADS,
- Types.required(TypeProtos.MinorType.BIGINT));
- private BigIntVector busyThreads;
-
- private static final List<SqlTypeName> FIELDS = Lists.newArrayList(SqlTypeName.VARCHAR, SqlTypeName.BIGINT,
- SqlTypeName.BIGINT, SqlTypeName.BIGINT);
-
- private static final List<String> NAMES = Lists.newArrayList(HOST_NAME, USER_PORT, TOTAL_THREADS, BUSY_THREADS);
-
- private ThreadsRecord() {
- }
-
- @Override
- public void setup(final OutputMutator output) throws SchemaChangeException {
- hostName = output.addField(hostNameField, VarCharVector.class);
- userPort = output.addField(userPortField, BigIntVector.class);
- totalThreads = output.addField(totalThreadsField, BigIntVector.class);
- busyThreads = output.addField(busyThreadsField, BigIntVector.class);
- }
-
- @Override
- public void setRecordValues(final FragmentContext context) {
- final DrillbitContext drillbitContext = context.getDrillbitContext();
- final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
-
- final CoordinationProtos.DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
- final String address = endpoint.getAddress();
- final VarCharVector.Mutator hostNameMutator = hostName.getMutator();
- hostNameMutator.setSafe(0, address.getBytes());
- hostNameMutator.setValueCount(1);
-
- final int port = endpoint.getUserPort();
- final BigIntVector.Mutator userPortMutator = userPort.getMutator();
- userPortMutator.setSafe(0, port);
- userPortMutator.setValueCount(1);
-
- final BigIntVector.Mutator totalThreadsMutator = totalThreads.getMutator();
- totalThreadsMutator.setSafe(0, threadMXBean.getPeakThreadCount());
- totalThreadsMutator.setValueCount(1);
-
- final BigIntVector.Mutator busyThreadsMutator = busyThreads.getMutator();
- busyThreadsMutator.setSafe(0, threadMXBean.getThreadCount());
- busyThreadsMutator.setValueCount(1);
- }
-
- @Override
- public List<SqlTypeName> getFieldSqlTypeNames() {
- return FIELDS;
- }
-
- @Override
- public List<String> getFieldNames() {
- return NAMES;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/7cee11c2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
index 4f4d29b..e86fc28 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
@@ -22,7 +22,7 @@ import org.apache.drill.exec.ExecConstants;
import org.junit.Test;
public class TestSystemTable extends BaseTestQuery {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSystemTable.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSystemTable.class);
@Test
public void alterSessionOption() throws Exception {
@@ -45,15 +45,19 @@ public class TestSystemTable extends BaseTestQuery {
.go();
}
- // need to enhance this
+ // DRILL-2670
@Test
- public void testThreadsTable() throws Exception {
+ public void optionsOrderBy() throws Exception {
+ test("select * from sys.options order by name");
+ }
+
+ @Test
+ public void threadsTable() throws Exception {
test("select * from sys.threads");
}
- // need to enhance this
@Test
- public void testMemoryTable() throws Exception {
+ public void memoryTable() throws Exception {
test("select * from sys.memory");
}
}