You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2015/09/11 00:01:17 UTC
drill git commit: DRILL-1942-hygiene
Repository: drill
Updated Branches:
refs/heads/master eeaea7a8a -> fdb6b4fec
DRILL-1942-hygiene
- Formatting
- @Overrides
- finals
- some AutoCloseable additions
- new isCancelled() abstract method on FragmentManager, implemented on subclasses
Added missing new abstract method isCancelled()
Close apache/drill#120
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/fdb6b4fe
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/fdb6b4fe
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/fdb6b4fe
Branch: refs/heads/master
Commit: fdb6b4fecee30282d8f490e78b7f2dc3a2e27347
Parents: eeaea7a
Author: Chris Westin <cw...@yahoo.com>
Authored: Mon Aug 17 15:42:28 2015 -0700
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Thu Sep 10 14:25:43 2015 -0700
----------------------------------------------------------------------
.../drill/common/config/NestedConfig.java | 43 ++++++++-
.../exec/planner/logical/DrillWindowRel.java | 1 -
.../drill/exec/record/VectorAccessible.java | 26 ++++++
.../exec/record/selection/SelectionVector2.java | 32 +++----
.../exec/record/selection/SelectionVector4.java | 10 ++-
.../org/apache/drill/exec/rpc/RpcDecoder.java | 4 +-
.../drill/exec/rpc/user/QueryDataBatch.java | 9 +-
.../drill/exec/rpc/user/QueryResultHandler.java | 6 +-
.../apache/drill/exec/rpc/user/UserClient.java | 7 +-
.../org/apache/drill/exec/server/Drillbit.java | 15 ++--
.../easy/json/reader/BaseJsonProcessor.java | 11 ++-
.../exec/store/mock/MockScanBatchCreator.java | 10 +--
.../drill/exec/vector/AllocationHelper.java | 13 ++-
.../org/apache/drill/exec/vector/BitVector.java | 33 +++++--
.../exec/vector/complex/RepeatedListVector.java | 5 +-
.../exec/vector/complex/RepeatedMapVector.java | 93 ++++++++++----------
.../vector/complex/fn/DrillBufInputStream.java | 5 +-
.../exec/vector/complex/fn/JsonReader.java | 7 +-
.../vector/complex/impl/AbstractBaseWriter.java | 16 ++--
.../exec/vector/complex/writer/FieldWriter.java | 8 +-
.../exec/work/batch/BaseRawBatchBuffer.java | 9 ++
.../exec/work/fragment/FragmentManager.java | 30 ++++---
.../work/fragment/NonRootFragmentManager.java | 18 ++--
.../exec/work/fragment/RootFragmentManager.java | 7 +-
.../java/org/apache/drill/TestTpchPlanning.java | 49 +++++------
25 files changed, 279 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/config/NestedConfig.java b/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
index 3fd885f..60fe013 100644
--- a/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
@@ -30,7 +30,7 @@ import com.typesafe.config.ConfigResolveOptions;
import com.typesafe.config.ConfigValue;
abstract class NestedConfig implements Config {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedConfig.class);
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedConfig.class);
private final Config c;
@@ -38,164 +38,203 @@ abstract class NestedConfig implements Config {
this.c = c;
}
+ @Override
public ConfigObject root() {
return c.root();
}
+ @Override
public ConfigOrigin origin() {
return c.origin();
}
+ @Override
public Config withFallback(ConfigMergeable other) {
return c.withFallback(other);
}
+ @Override
public Config resolve() {
return c.resolve();
}
+ @Override
public Config resolve(ConfigResolveOptions options) {
return c.resolve(options);
}
+ @Override
public void checkValid(Config reference, String... restrictToPaths) {
c.checkValid(reference, restrictToPaths);
}
+ @Override
public boolean hasPath(String path) {
return c.hasPath(path);
}
+ @Override
public boolean isEmpty() {
return c.isEmpty();
}
+ @Override
public Set<Entry<String, ConfigValue>> entrySet() {
return c.entrySet();
}
+ @Override
public boolean getBoolean(String path) {
return c.getBoolean(path);
}
+ @Override
public Number getNumber(String path) {
return c.getNumber(path);
}
+ @Override
public int getInt(String path) {
return c.getInt(path);
}
+ @Override
public long getLong(String path) {
return c.getLong(path);
}
+ @Override
public double getDouble(String path) {
return c.getDouble(path);
}
+ @Override
public String getString(String path) {
return c.getString(path);
}
+ @Override
public ConfigObject getObject(String path) {
return c.getObject(path);
}
+ @Override
public Config getConfig(String path) {
return c.getConfig(path);
}
+ @Override
public Object getAnyRef(String path) {
return c.getAnyRef(path);
}
+ @Override
public ConfigValue getValue(String path) {
return c.getValue(path);
}
+ @Override
public Long getBytes(String path) {
return c.getBytes(path);
}
+ @Override
public Long getMilliseconds(String path) {
return c.getMilliseconds(path);
}
+ @Override
public Long getNanoseconds(String path) {
return c.getNanoseconds(path);
}
+ @Override
public ConfigList getList(String path) {
return c.getList(path);
}
+ @Override
public List<Boolean> getBooleanList(String path) {
return c.getBooleanList(path);
}
+ @Override
public List<Number> getNumberList(String path) {
return c.getNumberList(path);
}
+ @Override
public List<Integer> getIntList(String path) {
return c.getIntList(path);
}
+ @Override
public List<Long> getLongList(String path) {
return c.getLongList(path);
}
+ @Override
public List<Double> getDoubleList(String path) {
return c.getDoubleList(path);
}
+ @Override
public List<String> getStringList(String path) {
return c.getStringList(path);
}
+ @Override
public List<? extends ConfigObject> getObjectList(String path) {
return c.getObjectList(path);
}
+ @Override
public List<? extends Config> getConfigList(String path) {
return c.getConfigList(path);
}
+ @Override
public List<? extends Object> getAnyRefList(String path) {
return c.getAnyRefList(path);
}
+ @Override
public List<Long> getBytesList(String path) {
return c.getBytesList(path);
}
+ @Override
public List<Long> getMillisecondsList(String path) {
return c.getMillisecondsList(path);
}
+ @Override
public List<Long> getNanosecondsList(String path) {
return c.getNanosecondsList(path);
}
+ @Override
public Config withOnlyPath(String path) {
return c.withOnlyPath(path);
}
+ @Override
public Config withoutPath(String path) {
return c.withoutPath(path);
}
+ @Override
public Config atPath(String path) {
return c.atPath(path);
}
+ @Override
public Config atKey(String key) {
return c.atKey(key);
}
+ @Override
public Config withValue(String path, ConfigValue value) {
return c.withValue(path, value);
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRel.java
index 58c42fc..b49f846 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRel.java
@@ -19,7 +19,6 @@
package org.apache.drill.exec.planner.logical;
import com.google.common.collect.Lists;
-import com.sun.java.swing.plaf.windows.resources.windows;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.expression.ExpressionPosition;
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
index 9db1681..6eb58c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
@@ -19,9 +19,35 @@ package org.apache.drill.exec.record;
import org.apache.drill.common.expression.SchemaPath;
+// TODO javadoc
public interface VectorAccessible extends Iterable<VectorWrapper<?>> {
+ // TODO are these <?> releated in any way? Should they be the same one?
+ // TODO javadoc
public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds);
+
+ /**
+ * Get the value vector type and id for the given schema path. The TypedFieldId
+ * should store a fieldId which is the same as the ordinal position of the field
+ * within the Iterator provided this classes implementation of Iterable<ValueVector>.
+ *
+ * @param path the path where the vector should be located.
+ * @return the local field id associated with this vector. If no field matches this
+ * path, this will return a null TypedFieldId
+ */
public TypedFieldId getValueVectorId(SchemaPath path);
+
+ /**
+ * Get the schema of the current RecordBatch. This changes if and only if a *_NEW_SCHEMA
+ * IterOutcome is provided.
+ *
+ * @return schema of the current batch
+ */
public BatchSchema getSchema();
+
+ /**
+ * Get the number of records.
+ *
+ * @return number of records
+ */
public int getRecordCount();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index ba8640a..3dab51f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -19,9 +19,6 @@ package org.apache.drill.exec.record.selection;
import io.netty.buffer.DrillBuf;
-import java.io.Closeable;
-import java.io.IOException;
-
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
import org.apache.drill.exec.record.DeadBuf;
@@ -29,8 +26,8 @@ import org.apache.drill.exec.record.DeadBuf;
/**
* A selection vector that fronts, at most, a
*/
-public class SelectionVector2 implements Closeable{
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector2.class);
+public class SelectionVector2 implements AutoCloseable {
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector2.class);
private final BufferAllocator allocator;
private int recordCount;
@@ -42,7 +39,7 @@ public class SelectionVector2 implements Closeable{
this.allocator = allocator;
}
- public int getCount(){
+ public int getCount() {
return recordCount;
}
@@ -55,7 +52,7 @@ public class SelectionVector2 implements Closeable{
if (clear) {
/* Increment the ref count for this buffer */
- bufferHandle.retain();
+ bufferHandle.retain(1);
/* We are passing ownership of the buffer to the
* caller. clear the buffer from within our selection vector
@@ -66,28 +63,27 @@ public class SelectionVector2 implements Closeable{
return bufferHandle;
}
- public void setBuffer(DrillBuf bufferHandle)
- {
+ public void setBuffer(DrillBuf bufferHandle) {
/* clear the existing buffer */
clear();
this.buffer = bufferHandle;
- buffer.retain();
+ buffer.retain(1);
}
- public char getIndex(int index){
+ public char getIndex(int index) {
return buffer.getChar(index * RECORD_SIZE);
}
- public void setIndex(int index, char value){
+ public void setIndex(int index, char value) {
buffer.setChar(index * RECORD_SIZE, value);
}
- public long getDataAddr(){
+ public long getDataAddr() {
return buffer.memoryAddress();
}
- public void setIndex(int index, int value){
+ public void setIndex(int index, int value) {
buffer.setChar(index, value);
}
@@ -106,7 +102,7 @@ public class SelectionVector2 implements Closeable{
}
@Override
- public SelectionVector2 clone(){
+ public SelectionVector2 clone() {
SelectionVector2 newSV = new SelectionVector2(allocator);
newSV.recordCount = recordCount;
newSV.buffer = buffer;
@@ -115,7 +111,7 @@ public class SelectionVector2 implements Closeable{
* same buffer, if we don't do a retain() on the newSV's
* buffer, it might get freed.
*/
- newSV.buffer.retain();
+ newSV.buffer.retain(1);
clear();
return newSV;
}
@@ -134,9 +130,7 @@ public class SelectionVector2 implements Closeable{
}
@Override
- public void close() throws IOException {
+ public void close() {
clear();
}
-
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index 8db0437..3b8dd0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -22,8 +22,8 @@ import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.DeadBuf;
-public class SelectionVector4 {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4.class);
+public class SelectionVector4 implements AutoCloseable {
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4.class);
private ByteBuf data;
private int recordCount;
@@ -73,7 +73,7 @@ public class SelectionVector4 {
public SelectionVector4 createNewWrapperCurrent(int batchRecordCount) {
try {
data.retain();
- SelectionVector4 sv4 = new SelectionVector4(data, recordCount, batchRecordCount);
+ final SelectionVector4 sv4 = new SelectionVector4(data, recordCount, batchRecordCount);
sv4.start = this.start;
return sv4;
} catch (SchemaChangeException e) {
@@ -116,4 +116,8 @@ public class SelectionVector4 {
}
}
+ @Override
+ public void close() {
+ clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
index 74a4afb..ac48187 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
@@ -70,7 +70,7 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
final int pBodyLength = readRawVarint32(is);
final ByteBuf pBody = buffer.slice(buffer.readerIndex(), pBodyLength);
buffer.skipBytes(pBodyLength);
- pBody.retain();
+ pBody.retain(1);
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug("Read protobuf body of length {} into buffer {}.", pBodyLength, pBody);
}
@@ -94,7 +94,7 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
throw new CorruptedFrameException(String.format("Expected to receive a raw body of %d bytes but received a buffer with %d bytes.", dBodyLength, buffer.readableBytes()));
}
dBody = buffer.slice();
- dBody.retain();
+ dBody.retain(1);
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug("Read raw body of {}", dBody);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryDataBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryDataBatch.java
index 914bd00..f2ef414 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryDataBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryDataBatch.java
@@ -22,17 +22,17 @@ import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.proto.UserBitShared.QueryData;
public class QueryDataBatch {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultBatch.class);
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryDataBatch.class);
private final QueryData header;
private final DrillBuf data;
public QueryDataBatch(QueryData header, DrillBuf data) {
-// logger.debug("New Result Batch with header {} and data {}", header, data);
+ // logger.debug("New Result Batch with header {} and data {}", header, data);
this.header = header;
this.data = data;
if (this.data != null) {
- data.retain();
+ data.retain(1);
}
}
@@ -50,7 +50,7 @@ public class QueryDataBatch {
public void release() {
if (data != null) {
- data.release();
+ data.release(1);
}
}
@@ -58,5 +58,4 @@ public class QueryDataBatch {
public String toString() {
return "QueryResultBatch [header=" + header + ", data=" + data + "]";
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 41bb413..14c7154 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -148,17 +148,17 @@ public class QueryResultHandler {
ByteBuf pBody, ByteBuf dBody ) throws RpcException {
final QueryData queryData = RpcBus.get( pBody, QueryData.PARSER );
// Current batch coming in.
- final QueryDataBatch batch = new QueryDataBatch( queryData, (DrillBuf) dBody );
+ final DrillBuf drillBuf = (DrillBuf) dBody;
+ final QueryDataBatch batch = new QueryDataBatch( queryData, drillBuf );
final QueryId queryId = queryData.getQueryId();
logger.debug( "batchArrived: queryId = {}", queryId );
logger.trace( "batchArrived: batch = {}", batch );
- UserResultsListener resultsListener = newUserResultsListener(queryId);
+ final UserResultsListener resultsListener = newUserResultsListener(queryId);
// A data case--pass on via dataArrived
-
try {
resultsListener.dataArrived(batch, throttle);
// That releases batch if successful.
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index b39a103..dde3e49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -48,7 +48,6 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
private final QueryResultHandler queryResultHandler = new QueryResultHandler();
-
private boolean supportComplexTypes = true;
public UserClient(DrillConfig config, boolean supportComplexTypes, BufferAllocator alloc,
@@ -68,8 +67,8 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
send(queryResultHandler.getWrappedListener(connection, resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
}
- public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint, UserProperties props, UserBitShared.UserCredentials credentials)
- throws RpcException {
+ public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint,
+ UserProperties props, UserBitShared.UserCredentials credentials) {
UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
.setRpcVersion(UserRpcConfig.RPC_VERSION)
.setSupportListening(true)
@@ -113,7 +112,6 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
default:
throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
}
-
}
@Override
@@ -135,5 +133,4 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 3862cea..a3f17e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -51,7 +51,6 @@ import org.glassfish.jersey.servlet.ServletContainer;
import com.codahale.metrics.servlets.MetricsServlet;
import com.codahale.metrics.servlets.ThreadDumpServlet;
import com.google.common.base.Stopwatch;
-import com.google.common.io.Closeables;
/**
* Starts, tracks and stops all the required services for a Drillbit daemon to work.
@@ -287,11 +286,12 @@ public class Drillbit implements AutoCloseable {
}
}
- Closeables.closeQuietly(engine);
+ // TODO these should use a DeferredException
+ AutoCloseables.close(engine, logger);
AutoCloseables.close(storeProvider, logger);
- Closeables.closeQuietly(coord);
+ AutoCloseables.close(coord, logger);
AutoCloseables.close(manager, logger);
- Closeables.closeQuietly(context);
+ AutoCloseables.close(context, logger);
logger.info("Shutdown completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
isClosed = true;
@@ -328,7 +328,12 @@ public class Drillbit implements AutoCloseable {
public void run() {
logger.info("Received shutdown request.");
try {
- synchronized (idCounter) {
+ /*
+ * We can avoid metrics deregistration concurrency issues by only closing
+ * one drillbit at a time. To enforce that, we synchronize on a convenient
+ * singleton object.
+ */
+ synchronized(idCounter) {
drillbit.close();
}
} catch(final Exception e) {
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
index 7833631..a89fa86 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
@@ -33,7 +33,7 @@ import org.apache.drill.common.exceptions.UserException;
public abstract class BaseJsonProcessor implements JsonProcessor {
- private static final ObjectMapper MAPPER = new ObjectMapper() //
+ private static final ObjectMapper MAPPER = new ObjectMapper()
.configure(JsonParser.Feature.ALLOW_COMMENTS, true)
.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
@@ -41,12 +41,12 @@ public abstract class BaseJsonProcessor implements JsonProcessor {
protected DrillBuf workBuf;
public BaseJsonProcessor(DrillBuf workBuf) {
- this.workBuf = Preconditions.checkNotNull(workBuf);
+ workBuf = Preconditions.checkNotNull(workBuf);
}
@Override
public void setSource(InputStream is) throws IOException {
- this.parser = MAPPER.getFactory().createParser(is);
+ parser = MAPPER.getFactory().createParser(is);
}
@Override
@@ -59,10 +59,10 @@ public abstract class BaseJsonProcessor implements JsonProcessor {
String field,
String msg,
Object... args) {
- if (msg != null){
+ if (msg != null) {
exceptionBuilder.message(msg, args);
}
- if(field!=null) {
+ if(field != null) {
exceptionBuilder.pushContext("Field ", field);
}
exceptionBuilder.pushContext("Column ", parser.getCurrentLocation().getColumnNr()+1)
@@ -78,5 +78,4 @@ public abstract class BaseJsonProcessor implements JsonProcessor {
UserException.Builder exceptionBuilder = UserException.dataReadError(e);
return getExceptionWithContext(exceptionBuilder, field, msg, args);
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
index 74423bf..6cdbc3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -30,16 +30,16 @@ import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
+public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP> {
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
@Override
public ScanBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
- List<MockScanEntry> entries = config.getReadEntries();
- List<RecordReader> readers = Lists.newArrayList();
- for(MockScanEntry e : entries){
+ final List<MockScanEntry> entries = config.getReadEntries();
+ final List<RecordReader> readers = Lists.newArrayList();
+ for(final MockScanEntry e : entries) {
readers.add(new MockRecordReader(context, e));
}
return new ScanBatch(config, context, readers.iterator());
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
index e518042..622e2d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
@@ -21,22 +21,22 @@ import org.apache.drill.exec.vector.complex.RepeatedFixedWidthVectorLike;
import org.apache.drill.exec.vector.complex.RepeatedVariableWidthVectorLike;
public class AllocationHelper {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationHelper.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationHelper.class);
- public static void allocate(ValueVector v, int valueCount, int bytesPerValue){
+ public static void allocate(ValueVector v, int valueCount, int bytesPerValue) {
allocate(v, valueCount, bytesPerValue, 5);
}
public static void allocatePrecomputedChildCount(ValueVector v, int valueCount, int bytesPerValue, int childValCount){
- if(v instanceof FixedWidthVector){
+ if(v instanceof FixedWidthVector) {
((FixedWidthVector) v).allocateNew(valueCount);
} else if (v instanceof VariableWidthVector) {
((VariableWidthVector) v).allocateNew(valueCount * bytesPerValue, valueCount);
- }else if(v instanceof RepeatedFixedWidthVectorLike){
+ } else if(v instanceof RepeatedFixedWidthVectorLike) {
((RepeatedFixedWidthVectorLike) v).allocateNew(valueCount, childValCount);
- }else if(v instanceof RepeatedVariableWidthVectorLike){
+ } else if(v instanceof RepeatedVariableWidthVectorLike) {
((RepeatedVariableWidthVectorLike) v).allocateNew(childValCount * bytesPerValue, valueCount, childValCount);
- }else{
+ } else {
v.allocateNew();
}
}
@@ -58,5 +58,4 @@ public class AllocationHelper {
v.allocateNew();
}
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 624e737..dc9cadb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -80,12 +80,14 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
allocationSizeInBytes = getSizeFromCount(valueCount);
}
+ @Override
public void allocateNew() {
if (!allocateNewSafe()) {
throw new OutOfMemoryRuntimeException();
}
}
+ @Override
public boolean allocateNewSafe() {
long curAllocationSize = allocationSizeInBytes;
if (allocationMonitor > 10) {
@@ -119,6 +121,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
* @param valueCount
* The number of values which can be contained within this vector.
*/
+ @Override
public void allocateNew(int valueCount) {
final int size = getSizeFromCount(valueCount);
allocateBytes(size);
@@ -129,7 +132,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
}
- final int curSize = (int)size;
+ final int curSize = (int) size;
clear();
data = allocator.buffer(curSize);
zeroVector();
@@ -189,21 +192,27 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
this.valueCount = valueCount;
}
+ @Override
public Mutator getMutator() {
return new Mutator();
}
+ @Override
public Accessor getAccessor() {
return new Accessor();
}
+ @Override
public TransferPair getTransferPair() {
return new TransferImpl(getField());
}
+
+ @Override
public TransferPair getTransferPair(FieldReference ref) {
return new TransferImpl(getField().withPath(ref));
}
+ @Override
public TransferPair makeTransferPair(ValueVector to) {
return new TransferImpl((BitVector) to);
}
@@ -211,8 +220,11 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
public void transferTo(BitVector target) {
target.clear();
+ if (target.data != null) {
+ target.data.release();
+ }
target.data = data;
- target.data.retain();
+ target.data.retain(1);
target.valueCount = valueCount;
clear();
}
@@ -225,19 +237,22 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
if (offset == 0) {
target.clear();
// slice
- target.data = (DrillBuf) this.data.slice(firstByte, byteSize);
- target.data.retain();
+ if (target.data != null) {
+ target.data.release();
+ }
+ target.data = (DrillBuf) data.slice(firstByte, byteSize);
+ target.data.retain(1);
} else {
// Copy data
// When the first bit starts from the middle of a byte (offset != 0), copy data from src BitVector.
// Each byte in the target is composed by a part in i-th byte, another part in (i+1)-th byte.
// The last byte copied to target is a bit tricky :
- // 1) if length requires partly byte ( length % 8 !=0), copy the remaining bits only.
+ // 1) if length requires partly byte (length % 8 !=0), copy the remaining bits only.
// 2) otherwise, copy the last byte in the same way as to the prior bytes.
target.clear();
target.allocateNew(length);
// TODO maybe do this one word at a time, rather than byte?
- for (int i = 0; i < byteSize - 1; i++) {
+ for(int i = 0; i < byteSize - 1; i++) {
target.data.setByte(i, (((this.data.getByte(firstByte + i) & 0xFF) >>> offset) + (this.data.getByte(firstByte + i + 1) << (8 - offset))));
}
if (length % 8 != 0) {
@@ -261,14 +276,17 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
this.to = to;
}
+ @Override
public BitVector getTo() {
return to;
}
+ @Override
public void transfer() {
transferTo(to);
}
+ @Override
public void splitAndTransfer(int startIndex, int length) {
splitAndTransferTo(startIndex, length, to);
}
@@ -393,6 +411,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
set(index, holder.value);
}
+ @Override
public final void setValueCount(int valueCount) {
int currentValueCapacity = getValueCapacity();
BitVector.this.valueCount = valueCount;
@@ -411,7 +430,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
@Override
public final void generateTestData(int values) {
boolean even = true;
- for (int i = 0; i < values; i++, even = !even) {
+ for(int i = 0; i < values; i++, even = !even) {
if (even) {
set(i, 1);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index 85e4d1d..b7258f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.netty.buffer.DrillBuf;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -47,7 +46,6 @@ import org.apache.drill.exec.vector.complex.impl.NullReader;
import org.apache.drill.exec.vector.complex.impl.RepeatedListReaderImpl;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
-
public class RepeatedListVector extends AbstractContainerVector
implements RepeatedValueVector, RepeatedFixedWidthVectorLike {
@@ -180,7 +178,7 @@ public class RepeatedListVector extends AbstractContainerVector
public DelegateRepeatedVector(MaterializedField field, BufferAllocator allocator) {
super(field, allocator);
- this.emptyPopulator = new EmptyValuePopulator(getOffsetVector());
+ emptyPopulator = new EmptyValuePopulator(getOffsetVector());
}
@Override
@@ -423,5 +421,4 @@ public class RepeatedListVector extends AbstractContainerVector
public void copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) {
delegate.copyFromSafe(fromIndex, thisIndex, from.delegate);
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 40d0be4..84c314c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -54,13 +54,13 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-public class RepeatedMapVector extends AbstractMapVector implements RepeatedValueVector, RepeatedFixedWidthVectorLike {
-
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedMapVector.class);
+public class RepeatedMapVector extends AbstractMapVector
+ implements RepeatedValueVector, RepeatedFixedWidthVectorLike {
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedMapVector.class);
public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REPEATED).build();
- final UInt4Vector offsets; // offsets to start of each record (considering record indices are 0-indexed)
+ private final UInt4Vector offsets; // offsets to start of each record (considering record indices are 0-indexed)
private final RepeatedMapReaderImpl reader = new RepeatedMapReaderImpl(RepeatedMapVector.this);
private final RepeatedMapAccessor accessor = new RepeatedMapAccessor();
private final Mutator mutator = new Mutator();
@@ -91,7 +91,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
public void setInitialCapacity(int numRecords) {
offsets.setInitialCapacity(numRecords + 1);
final Iterable<ValueVector> container = this;
- for(ValueVector v : container) {
+ for(final ValueVector v : container) {
v.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
}
}
@@ -123,7 +123,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
@Override
public List<ValueVector> getPrimitiveVectors() {
- List<ValueVector> primitiveVectors = super.getPrimitiveVectors();
+ final List<ValueVector> primitiveVectors = super.getPrimitiveVectors();
primitiveVectors.add(offsets);
return primitiveVectors;
}
@@ -134,7 +134,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
return 0;
}
long buffer = offsets.getBufferSize();
- for (ValueVector v : (Iterable<ValueVector>)this) {
+ for (final ValueVector v : (Iterable<ValueVector>) this) {
buffer += v.getBufferSize();
}
return (int) buffer;
@@ -142,8 +142,8 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
@Override
public void close() {
- super.close();
offsets.close();
+ super.close();
}
@Override
@@ -170,13 +170,13 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
int i = 0;
ValueVector vector;
- for (String child:from.getChildFieldNames()) {
+ for (final String child:from.getChildFieldNames()) {
int preSize = to.size();
vector = from.getChild(child);
if (vector == null) {
continue;
}
- ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
+ final ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
if (to.size() != preSize) {
newVector.allocateNew();
}
@@ -262,13 +262,13 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
this.pairs = new TransferPair[from.size()];
int i = 0;
ValueVector vector;
- for (String child:from.getChildFieldNames()) {
+ for (final String child : from.getChildFieldNames()) {
int preSize = to.size();
vector = from.getChild(child);
if (vector == null) {
continue;
}
- ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
+ final ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
if (allocate && to.size() != preSize) {
newVector.allocateNew();
}
@@ -305,7 +305,6 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
}
to.getMutator().setValueCount(length);
}
-
}
private static class RepeatedMapTransferPair implements TransferPair{
@@ -330,21 +329,22 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
int i = 0;
ValueVector vector;
- for (String child:from.getChildFieldNames()) {
- int preSize = to.size();
+ for (final String child : from.getChildFieldNames()) {
+ final int preSize = to.size();
vector = from.getChild(child);
if (vector == null) {
continue;
}
- ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
+
+ final ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
if (to.size() != preSize) {
newVector.allocateNew();
}
+
pairs[i++] = vector.makeTransferPair(newVector);
}
}
-
@Override
public void transfer() {
from.offsets.transferTo(to.offsets);
@@ -371,7 +371,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
p.copyValueSafe(i, newIndex);
}
}
- to.offsets.getMutator().setSafe(destIndex+1, newIndex);
+ to.offsets.getMutator().setSafe(destIndex + 1, newIndex);
}
@Override
@@ -380,27 +380,25 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
final UInt4Vector.Mutator m = to.offsets.getMutator();
final int startPos = a.get(groupStart);
- final int endPos = a.get(groupStart+groups);
+ final int endPos = a.get(groupStart + groups);
final int valuesToCopy = endPos - startPos;
to.offsets.clear();
to.offsets.allocateNew(groups + 1);
int normalizedPos;
- for (int i=0; i < groups+1; i++) {
- normalizedPos = a.get(groupStart+i) - startPos;
+ for (int i = 0; i < groups + 1; i++) {
+ normalizedPos = a.get(groupStart + i) - startPos;
m.set(i, normalizedPos);
}
m.setValueCount(groups + 1);
to.emptyPopulator.populate(groups);
- for (TransferPair p : pairs) {
+ for (final TransferPair p : pairs) {
p.splitAndTransfer(startPos, valuesToCopy);
}
-
}
-
}
@@ -425,8 +423,8 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
@Override
public DrillBuf[] getBuffers(boolean clear) {
- int expectedBufferSize = getBufferSize();
- int actualBufferSize = super.getBufferSize();
+ final int expectedBufferSize = getBufferSize();
+ final int actualBufferSize = super.getBufferSize();
Preconditions.checkArgument(expectedBufferSize == actualBufferSize + offsets.getBufferSize());
return ArrayUtils.addAll(offsets.getBuffers(clear), super.getBuffers(clear));
@@ -441,7 +439,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
offsets.load(offsetField, buffer);
int bufOffset = offsetField.getBufferLength();
- for (int i=1; i<children.size(); i++) {
+ for (int i = 1; i < children.size(); i++) {
final SerializedField child = children.get(i);
final MaterializedField fieldDef = MaterializedField.create(child);
ValueVector vector = getChild(fieldDef.getLastName());
@@ -479,15 +477,14 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
}
public class RepeatedMapAccessor implements RepeatedAccessor {
-
@Override
public Object getObject(int index) {
final List<Object> list = new JsonStringArrayList();
- int end = offsets.getAccessor().get(index+1);
+ final int end = offsets.getAccessor().get(index+1);
String fieldName;
for (int i = offsets.getAccessor().get(index); i < end; i++) {
- Map<String, Object> vv = Maps.newLinkedHashMap();
- for (MaterializedField field:getField().getChildren()) {
+ final Map<String, Object> vv = Maps.newLinkedHashMap();
+ for (final MaterializedField field : getField().getChildren()) {
if (!field.equals(BaseRepeatedValueVector.OFFSETS_FIELD)) {
fieldName = field.getLastName();
final Object value = getChild(fieldName).getAccessor().getObject(i);
@@ -531,21 +528,24 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
}
public void get(int index, RepeatedMapHolder holder) {
- assert index < getValueCapacity() : String.format("Attempted to access index %d when value capacity is %d", index, getValueCapacity());
- holder.start = offsets.getAccessor().get(index);
- holder.end = offsets.getAccessor().get(index+1);
+ assert index < getValueCapacity() :
+ String.format("Attempted to access index %d when value capacity is %d",
+ index, getValueCapacity());
+ final UInt4Vector.Accessor offsetsAccessor = offsets.getAccessor();
+ holder.start = offsetsAccessor.get(index);
+ holder.end = offsetsAccessor.get(index + 1);
}
public void get(int index, ComplexHolder holder) {
- FieldReader reader = getReader();
+ final FieldReader reader = getReader();
reader.setPosition(index);
holder.reader = reader;
}
public void get(int index, int arrayIndex, ComplexHolder holder) {
- RepeatedMapHolder h = new RepeatedMapHolder();
+ final RepeatedMapHolder h = new RepeatedMapHolder();
get(index, h);
- int offset = h.start + arrayIndex;
+ final int offset = h.start + arrayIndex;
if (offset >= h.end) {
holder.reader = NullReader.INSTANCE;
@@ -554,36 +554,33 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
holder.reader = reader;
}
}
-
}
-
public class Mutator implements RepeatedMutator {
-
@Override
public void startNewValue(int index) {
- emptyPopulator.populate(index+1);
- offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
+ emptyPopulator.populate(index + 1);
+ offsets.getMutator().setSafe(index + 1, offsets.getAccessor().get(index));
}
@Override
public void setValueCount(int topLevelValueCount) {
emptyPopulator.populate(topLevelValueCount);
- offsets.getMutator().setValueCount(topLevelValueCount == 0 ? 0 : topLevelValueCount+1);
+ offsets.getMutator().setValueCount(topLevelValueCount == 0 ? 0 : topLevelValueCount + 1);
int childValueCount = offsets.getAccessor().get(topLevelValueCount);
- for (ValueVector v : getChildren()) {
+ for (final ValueVector v : getChildren()) {
v.getMutator().setValueCount(childValueCount);
}
}
@Override
- public void reset() { }
+ public void reset() {}
@Override
- public void generateTestData(int values) { }
+ public void generateTestData(int values) {}
public int add(int index) {
- final int prevEnd = offsets.getAccessor().get(index+1);
+ final int prevEnd = offsets.getAccessor().get(index + 1);
offsets.getMutator().setSafe(index + 1, prevEnd + 1);
return prevEnd;
}
@@ -594,7 +591,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
getMutator().reset();
offsets.clear();
- for(ValueVector vector:getChildren()) {
+ for(final ValueVector vector : getChildren()) {
vector.clear();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/DrillBufInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/DrillBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/DrillBufInputStream.java
index 1061a5c..4bf61d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/DrillBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/DrillBufInputStream.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Seekable;
* An InputStream that wraps a DrillBuf and implements the seekable interface.
*/
public class DrillBufInputStream extends ByteBufInputStream implements Seekable {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBufInputStream.class);
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBufInputStream.class);
private final DrillBuf buffer;
@@ -52,7 +52,8 @@ public class DrillBufInputStream extends ByteBufInputStream implements Seekable
return false;
}
- public static DrillBufInputStream getStream(int start, int end, DrillBuf buffer){
+ // Does not adopt the buffer
+ public static DrillBufInputStream getStream(int start, int end, DrillBuf buffer) {
DrillBuf buf = buffer.slice(start, end - start);
return new DrillBufInputStream(buf, end - start);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 5c03c02..603776d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -27,7 +27,6 @@ import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.store.easy.json.JsonProcessor;
import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor;
import org.apache.drill.exec.vector.complex.fn.VectorOutput.ListVectorOutput;
import org.apache.drill.exec.vector.complex.fn.VectorOutput.MapVectorOutput;
@@ -36,16 +35,14 @@ import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
-import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
public class JsonReader extends BaseJsonProcessor {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReader.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReader.class);
public final static int MAX_RECORD_SIZE = 128 * 1024;
private final WorkingBuffer workingBuffer;
@@ -466,7 +463,7 @@ public class JsonReader extends BaseJsonProcessor {
writer.varChar().writeVarChar(0, workingBuffer.prepareVarCharHolder(parser.getText()), workingBuffer.getBuf());
}
- private void writeData(ListWriter list) throws IOException {
+ private void writeData(ListWriter list) {
list.start();
outside: while (true) {
try {
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
index ec8c00b..0686420 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
@@ -20,33 +20,35 @@ package org.apache.drill.exec.vector.complex.impl;
import org.apache.drill.exec.vector.complex.writer.FieldWriter;
-abstract class AbstractBaseWriter implements FieldWriter{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBaseWriter.class);
+abstract class AbstractBaseWriter implements FieldWriter {
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBaseWriter.class);
final FieldWriter parent;
private int index;
public AbstractBaseWriter(FieldWriter parent) {
- super();
this.parent = parent;
}
+ @Override
public FieldWriter getParent() {
return parent;
}
- public boolean isRoot(){
+ public boolean isRoot() {
return parent == null;
}
- int idx(){
+ int idx() {
return index;
}
- public void setPosition(int index){
+ @Override
+ public void setPosition(int index) {
this.index = index;
}
- public void end(){
+ @Override
+ public void end() {
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/writer/FieldWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/writer/FieldWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/writer/FieldWriter.java
index 3faa4f7..1a64978 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/writer/FieldWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/writer/FieldWriter.java
@@ -21,9 +21,7 @@ import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ScalarWriter;
-
-
public interface FieldWriter extends MapWriter, ListWriter, ScalarWriter {
- public void allocate();
- public void clear();
-}
\ No newline at end of file
+ void allocate();
+ void clear();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index fbffd87..31f2e4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -60,6 +60,15 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
this.context = context;
}
+ /**
+ * Return the fragment count from construction time.
+ *
+ * @return the fragment count
+ */
+ protected int getFragmentCount() {
+ return fragmentCount;
+ }
+
@Override
public synchronized void enqueue(final RawFragmentBatch batch) throws IOException {
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
index ad880da..ff348cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
@@ -30,7 +30,6 @@ import org.apache.drill.exec.rpc.RemoteConnection;
* are avialable, a fragment manager will start a fragment executor to run the associated fragment.
*/
public interface FragmentManager {
-
/**
* Handle the next incoming record batch.
*
@@ -38,36 +37,43 @@ public interface FragmentManager {
* @return True if the fragment has enough incoming data to be able to be run.
* @throws FragmentSetupException, IOException
*/
- public abstract boolean handle(RawFragmentBatch batch) throws FragmentSetupException, IOException;
+ boolean handle(RawFragmentBatch batch) throws FragmentSetupException, IOException;
/**
* Get the fragment runner for this incoming fragment. Note, this can only be requested once.
*
* @return
*/
- public abstract FragmentExecutor getRunnable();
+ FragmentExecutor getRunnable();
+
+ void cancel();
- public abstract void cancel();
+ /**
+ * Find out if the FragmentManager has been cancelled.
+ *
+ * @return true if the FragmentManager has been cancelled.
+ */
+ boolean isCancelled();
/**
* If the executor is paused (for testing), this method should unpause the executor. This method should handle
* multiple calls.
*/
- public abstract void unpause();
+ void unpause();
- public boolean isWaiting();
+ boolean isWaiting();
- public abstract FragmentHandle getHandle();
+ FragmentHandle getHandle();
- public abstract FragmentContext getFragmentContext();
+ FragmentContext getFragmentContext();
- public abstract void addConnection(RemoteConnection connection);
+ void addConnection(RemoteConnection connection);
- public void receivingFragmentFinished(final FragmentHandle handle);
+ void receivingFragmentFinished(final FragmentHandle handle);
/**
* Sets autoRead property on all connections
* @param autoRead
*/
- public abstract void setAutoRead(boolean autoRead);
-}
\ No newline at end of file
+ void setAutoRead(boolean autoRead);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 3fc757c..9378e51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -39,7 +39,7 @@ import com.google.common.base.Preconditions;
*/
// TODO a lot of this is the same as RootFragmentManager
public class NonRootFragmentManager implements FragmentManager {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NonRootFragmentManager.class);
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NonRootFragmentManager.class);
private final IncomingBuffers buffers;
private final FragmentExecutor runner;
@@ -97,15 +97,15 @@ public class NonRootFragmentManager implements FragmentManager {
runner.receivingFragmentFinished(handle);
}
- /* (non-Javadoc)
- * @see org.apache.drill.exec.work.fragment.FragmentHandler#cancel()
- */
@Override
- public void cancel() {
- synchronized(this) {
- cancel = true;
- runner.cancel();
- }
+ public synchronized void cancel() {
+ cancel = true;
+ runner.cancel();
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancel;
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index f4f76dd..0713398 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -30,7 +30,7 @@ import org.apache.drill.exec.work.batch.IncomingBuffers;
// TODO a lot of this is the same as NonRootFragmentManager
public class RootFragmentManager implements FragmentManager {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
private final IncomingBuffers buffers;
private final FragmentExecutor runner;
@@ -71,6 +71,11 @@ public class RootFragmentManager implements FragmentManager {
}
@Override
+ public boolean isCancelled() {
+ return cancel;
+ }
+
+ @Override
public void unpause() {
runner.unpause();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/fdb6b4fe/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java
index 4f61fe7..1e35e0d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java
@@ -24,130 +24,129 @@ import org.junit.Test;
import org.junit.rules.TestRule;
-public class TestTpchPlanning extends PlanningBase{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchPlanning.class);
+public class TestTpchPlanning extends PlanningBase {
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchPlanning.class);
@Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(50000);
@Test
- public void tpch01() throws Exception{
+ public void tpch01() throws Exception {
testSqlPlanFromFile("queries/tpch/01.sql");
}
@Test
@Ignore // DRILL-512
- public void tpch02() throws Exception{
+ public void tpch02() throws Exception {
testSqlPlanFromFile("queries/tpch/02.sql");
}
@Test
- public void tpch03() throws Exception{
+ public void tpch03() throws Exception {
testSqlPlanFromFile("queries/tpch/03.sql");
}
@Test
- public void tpch04() throws Exception{
+ public void tpch04() throws Exception {
testSqlPlanFromFile("queries/tpch/04.sql");
}
@Test
- public void tpch05() throws Exception{
+ public void tpch05() throws Exception {
testSqlPlanFromFile("queries/tpch/05.sql");
}
@Test
- public void tpch06() throws Exception{
+ public void tpch06() throws Exception {
testSqlPlanFromFile("queries/tpch/06.sql");
}
@Test
- public void tpch07() throws Exception{
+ public void tpch07() throws Exception {
testSqlPlanFromFile("queries/tpch/07.sql");
}
@Test
@Ignore // cannot plan exception (was DRILL-516)
- public void tpch08() throws Exception{
+ public void tpch08() throws Exception {
testSqlPlanFromFile("queries/tpch/08.sql");
}
@Test
@Ignore // cannot plan exception (was DRILL-516)
- public void tpch09() throws Exception{
+ public void tpch09() throws Exception {
testSqlPlanFromFile("queries/tpch/09.sql");
}
@Test
- public void tpch10() throws Exception{
+ public void tpch10() throws Exception {
testSqlPlanFromFile("queries/tpch/10.sql");
}
@Test
@Ignore // cartesion problem
- public void tpch11() throws Exception{
+ public void tpch11() throws Exception {
testSqlPlanFromFile("queries/tpch/11.sql");
}
@Test
- public void tpch12() throws Exception{
+ public void tpch12() throws Exception {
testSqlPlanFromFile("queries/tpch/12.sql");
}
@Test
@Ignore // sporadic failures when part of the full build.
- public void tpch13() throws Exception{
+ public void tpch13() throws Exception {
testSqlPlanFromFile("queries/tpch/13.sql");
}
@Test
- public void tpch14() throws Exception{
+ public void tpch14() throws Exception {
testSqlPlanFromFile("queries/tpch/14.sql");
}
@Test
@Ignore // requires views.
- public void tpch15() throws Exception{
+ public void tpch15() throws Exception {
testSqlPlanFromFile("queries/tpch/15.sql");
}
@Test
@Ignore // invalid plan, due to Nulls value NOT IN sub-q
- public void tpch16() throws Exception{
+ public void tpch16() throws Exception {
testSqlPlanFromFile("queries/tpch/16.sql");
}
@Test
- public void tpch17() throws Exception{
+ public void tpch17() throws Exception {
testSqlPlanFromFile("queries/tpch/17.sql");
}
@Test
- public void tpch18() throws Exception{
+ public void tpch18() throws Exception {
testSqlPlanFromFile("queries/tpch/18.sql");
}
@Test
@Ignore // DRILL-519
- public void tpch19() throws Exception{
+ public void tpch19() throws Exception {
testSqlPlanFromFile("queries/tpch/19.sql");
}
@Test
@Ignore // DRILL-517
- public void tpch20() throws Exception{
+ public void tpch20() throws Exception {
testSqlPlanFromFile("queries/tpch/20.sql");
}
@Test
@Ignore // DRILL-519
- public void tpch21() throws Exception{
+ public void tpch21() throws Exception {
testSqlPlanFromFile("queries/tpch/21.sql");
}
@Test
@Ignore // DRILL-518
- public void tpch22() throws Exception{
+ public void tpch22() throws Exception {
testSqlPlanFromFile("queries/tpch/22.sql");
}
-
}