You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/04/19 03:03:41 UTC
[6/7] drill git commit: DRILL-2762: Update Fragment state reporting
and error collection
DRILL-2762: Update Fragment state reporting and error collection
DeferredException
- Add new throwAndClear operation on to allow checking for exceptions preClose in FragmentContext
- Add new getAndClear operation
BufferManager
- Ensure close() can be called multiple times by clearing managed buffer list on close().
FragmentContext/FragmentExecutor
- Update FragmentContext to have a preClose so that we can check closure state before doing final close.
- Update so that there is only a single state maintained between FragmentContext and FragmentExecutor
- Clean up FragmentExecutor run() method to better manage error states and have only single terminal point (avoiding multiple messages to Foreman).
- Add new CANCELLATION_REQUESTED state for FragmentState.
- Move all users of isCancelled or isFailed in main code to use shouldContinue()
- Update receivingFragmentFinished message to not cancel fragment (only inform root operator of cancellation)
WorkManager Updates
- Add new afterExecute command to the WorkManager ExecutorService so that we get log entries if a thread leaks an exception. (Otherwise logs don't show these exceptions and they only go to standard out.)
Profile Page
- Update profile page to show last update and last progress.
- Change durations to non-time presentation
Foreman/QueryManager
- Extract listenable interfaces into anonymous inner classes from body of Foreman
QueryManager
- Update QueryManager to track completed nodes rather than completed fragments using NodeTracker
- Update DrillbitStatusListener to decrement expected completion messages on Nodes that have died to avoid query hang when a node dies
FragmentData/MinorFragmentProfile
- Add ability to track last status update as well as last time fragment made progress
AbstractRecordBatch
- Update awareness of current cancellation state to avoid cancellation delays
Misc. Other changes
- Move ByteCode optimization code to only record assembly and code as trace messages
- Update SimpleRootExec to create fake ExecutorState to make existing tests work.
- Update sort to exit prematurely in the case that the fragment was asked to cancel.
- Add finals to all edited files.
- Modify control handler and FragmentManager to directly support receivingFragmentFinished
- Update receiver propagation message to avoid premature removal of fragment manager
- Update UserException.Builder to log a message if we're creating a new UserException (ERROR for System, INFO otherwise).
- Update Profile pages to use min and max instead of sorts.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c0d5a693
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c0d5a693
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c0d5a693
Branch: refs/heads/master
Commit: c0d5a693ac70c42019b5841eea91252f9eaa7792
Parents: 071ed89
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed Apr 8 19:06:03 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 18 17:56:21 2015 -0700
----------------------------------------------------------------------
.../apache/drill/common/DeferredException.java | 47 ++-
.../drill/common/concurrent/ExtendedLatch.java | 89 +++++
.../drill/common/exceptions/UserException.java | 66 ++--
.../org/apache/drill/exec/compile/AsmUtil.java | 28 +-
.../compile/bytecode/InstructionModifier.java | 28 +-
.../compile/bytecode/ScalarReplacementNode.java | 10 +-
.../apache/drill/exec/ops/BufferManager.java | 1 +
.../apache/drill/exec/ops/FragmentContext.java | 143 ++++----
.../drill/exec/physical/impl/BaseRootExec.java | 10 +-
.../impl/mergereceiver/MergingRecordBatch.java | 143 ++++----
.../impl/producer/ProducerConsumerBatch.java | 48 ++-
.../impl/project/ProjectRecordBatch.java | 250 +++++++------
.../UnorderedReceiverBatch.java | 39 +-
.../physical/impl/xsort/ExternalSortBatch.java | 4 +
.../exec/physical/impl/xsort/MSortTemplate.java | 43 ++-
.../drill/exec/proto/helper/QueryIdHelper.java | 23 +-
.../drill/exec/record/AbstractRecordBatch.java | 26 +-
.../exec/record/FragmentWritableBatch.java | 50 ++-
.../exec/rpc/data/DataResponseHandlerImpl.java | 11 +-
.../org/apache/drill/exec/server/Drillbit.java | 16 +-
.../exec/server/rest/profile/Comparators.java | 34 +-
.../server/rest/profile/FragmentWrapper.java | 73 ++--
.../server/rest/profile/OperatorWrapper.java | 26 +-
.../server/rest/profile/ProfileWrapper.java | 49 +--
.../exec/server/rest/profile/TableBuilder.java | 77 ++--
.../org/apache/drill/exec/work/WorkManager.java | 34 +-
.../exec/work/batch/ControlHandlerImpl.java | 20 +-
.../work/batch/UnlimitedRawBatchBuffer.java | 37 +-
.../apache/drill/exec/work/foreman/Foreman.java | 127 +++----
.../drill/exec/work/foreman/FragmentData.java | 77 +++-
.../drill/exec/work/foreman/QueryManager.java | 292 ++++++++++-----
.../work/fragment/AbstractStatusReporter.java | 62 +---
.../exec/work/fragment/FragmentExecutor.java | 357 +++++++++++--------
.../exec/work/fragment/FragmentManager.java | 6 +-
.../work/fragment/NonRootFragmentManager.java | 40 ++-
.../exec/work/fragment/RootFragmentManager.java | 17 +-
.../work/fragment/StateTransitionException.java | 12 +-
.../exec/work/fragment/StatusReporter.java | 2 +-
.../exec/physical/impl/SimpleRootExec.java | 40 ++-
.../work/batch/TestUnlimitedBatchBuffer.java | 4 +-
.../drill/exec/proto/SchemaUserBitShared.java | 14 +
.../apache/drill/exec/proto/UserBitShared.java | 249 +++++++++++--
.../drill/exec/proto/beans/FragmentState.java | 4 +-
.../exec/proto/beans/MinorFragmentProfile.java | 44 +++
protocol/src/main/protobuf/UserBitShared.proto | 3 +
45 files changed, 1718 insertions(+), 1057 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/common/src/main/java/org/apache/drill/common/DeferredException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/DeferredException.java b/common/src/main/java/org/apache/drill/common/DeferredException.java
index 99f18f1..c7111a9 100644
--- a/common/src/main/java/org/apache/drill/common/DeferredException.java
+++ b/common/src/main/java/org/apache/drill/common/DeferredException.java
@@ -71,9 +71,32 @@ public class DeferredException implements AutoCloseable {
*
* @return the deferred exception, or null
*/
- public Exception getException() {
- synchronized(this) {
- return exception;
+ public synchronized Exception getException() {
+ return exception;
+ }
+
+ public synchronized Exception getAndClear() {
+ Preconditions.checkState(!isClosed);
+
+ if (exception != null) {
+ final Exception local = exception;
+ exception = null;
+ return local;
+ }
+
+ return null;
+ }
+
+ /**
+ * If an exception exists, will throw the exception and then clear it. This is so in cases where want to reuse
+ * DeferredException, we don't double report the same exception.
+ *
+ * @throws Exception
+ */
+ public synchronized void throwAndClear() throws Exception{
+ final Exception e = getAndClear();
+ if (e != null) {
+ throw e;
}
}
@@ -98,24 +121,18 @@ public class DeferredException implements AutoCloseable {
try {
autoCloseable.close();
- } catch(Exception e) {
+ } catch(final Exception e) {
addException(e);
}
}
}
@Override
- public void close() throws Exception {
- synchronized(this) {
- Preconditions.checkState(!isClosed);
-
- try {
- if (exception != null) {
- throw exception;
- }
- } finally {
- isClosed = true;
- }
+ public synchronized void close() throws Exception {
+ try {
+ throwAndClear();
+ } finally {
+ isClosed = true;
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
new file mode 100644
index 0000000..a75ac32
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.concurrent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An extended CountDownLatch which allows us to await uninterruptibly.
+ */
+public class ExtendedLatch extends CountDownLatch {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExtendedLatch.class);
+
+ public ExtendedLatch() {
+ super(1);
+ }
+
+ public ExtendedLatch(final int count) {
+ super(count);
+ }
+
+ /**
+ * Returns whether or not interruptions should continue to be ignored. This can be overridden in subclasses to check a
+ * state variable or similar.
+ *
+ * @return Whether awaitUninterruptibly() should continue ignoring interruptions.
+ */
+ protected boolean ignoreInterruptions() {
+ return true;
+ }
+
+ /**
+ * Await without interruption for a given time.
+ * @param waitMillis
+ * Time in milliseconds to wait
+ * @return Whether the countdown reached zero or not.
+ */
+ public boolean awaitUninterruptibly(long waitMillis) {
+ final long targetMillis = System.currentTimeMillis() + waitMillis;
+ while (System.currentTimeMillis() < targetMillis) {
+ final long wait = targetMillis - System.currentTimeMillis();
+ if (wait < 1) {
+ return false;
+ }
+
+ try {
+ return await(wait, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ // if we weren't ready, the while loop will continue to wait
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Await without interruption. In the case of interruption, log a warning and continue to wait. This also checks the
+ * output of ignoreInterruptions();
+ */
+ public void awaitUninterruptibly() {
+ while (true) {
+ try {
+ await();
+ return;
+ } catch (final InterruptedException e) {
+ if (ignoreInterruptions()) {
+ // if we're still not ready, the while loop will cause us to wait again
+ logger.warn("Interrupted while waiting for event latch.", e);
+ } else {
+ return;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index e995346..9283339 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -53,7 +53,7 @@ public class UserException extends DrillRuntimeException {
* Rpc layer or UserResultListener.submitFailed()
*/
@Deprecated
- public static Builder systemError(Throwable cause) {
+ public static Builder systemError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.SYSTEM, cause);
}
@@ -79,7 +79,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder connectionError(Throwable cause) {
+ public static Builder connectionError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.CONNECTION, cause);
}
@@ -105,7 +105,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder dataReadError(Throwable cause) {
+ public static Builder dataReadError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.DATA_READ, cause);
}
@@ -131,7 +131,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder dataWriteError(Throwable cause) {
+ public static Builder dataWriteError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.DATA_WRITE, cause);
}
@@ -157,7 +157,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder functionError(Throwable cause) {
+ public static Builder functionError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.FUNCTION, cause);
}
@@ -183,7 +183,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder parseError(Throwable cause) {
+ public static Builder parseError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.PARSE, cause);
}
@@ -209,7 +209,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder permissionError(Throwable cause) {
+ public static Builder permissionError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.PERMISSION, cause);
}
@@ -235,7 +235,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder planError(Throwable cause) {
+ public static Builder planError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.PLAN, cause);
}
@@ -261,7 +261,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder resourceError(Throwable cause) {
+ public static Builder resourceError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.RESOURCE, cause);
}
@@ -287,7 +287,7 @@ public class UserException extends DrillRuntimeException {
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
- public static Builder unsupportedError(Throwable cause) {
+ public static Builder unsupportedError(final Throwable cause) {
return new Builder(DrillPBError.ErrorType.UNSUPPORTED_OPERATION, cause);
}
@@ -313,7 +313,7 @@ public class UserException extends DrillRuntimeException {
* or doesn't wrap a user exception
* @param cause exception to wrap inside a user exception. Can be null
*/
- private Builder(DrillPBError.ErrorType errorType, Throwable cause) {
+ private Builder(final DrillPBError.ErrorType errorType, final Throwable cause) {
this.cause = cause;
//TODO handle the improbable case where cause is a SYSTEM exception ?
@@ -339,7 +339,7 @@ public class UserException extends DrillRuntimeException {
* @param args Arguments referenced by the format specifiers in the format string
* @return this builder
*/
- public Builder message(String format, Object... args) {
+ public Builder message(final String format, final Object... args) {
// we can't replace the message of a user exception
if (uex == null && format != null) {
this.message = String.format(format, args);
@@ -353,7 +353,7 @@ public class UserException extends DrillRuntimeException {
*
* @param endpoint drillbit endpoint identity
*/
- public Builder addIdentity(CoordinationProtos.DrillbitEndpoint endpoint) {
+ public Builder addIdentity(final CoordinationProtos.DrillbitEndpoint endpoint) {
context.add(endpoint);
return this;
}
@@ -363,7 +363,7 @@ public class UserException extends DrillRuntimeException {
* @param value string line
* @return this builder
*/
- public Builder addContext(String value) {
+ public Builder addContext(final String value) {
context.add(value);
return this;
}
@@ -375,7 +375,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder addContext(String name, String value) {
+ public Builder addContext(final String name, final String value) {
context.add(name, value);
return this;
}
@@ -387,7 +387,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder addContext(String name, long value) {
+ public Builder addContext(final String name, final long value) {
context.add(name, value);
return this;
}
@@ -399,7 +399,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder addContext(String name, double value) {
+ public Builder addContext(final String name, final double value) {
context.add(name, value);
return this;
}
@@ -410,7 +410,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder pushContext(String value) {
+ public Builder pushContext(final String value) {
context.push(value);
return this;
}
@@ -422,7 +422,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder pushContext(String name, String value) {
+ public Builder pushContext(final String name, final String value) {
context.push(name, value);
return this;
}
@@ -434,7 +434,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder pushContext(String name, long value) {
+ public Builder pushContext(final String name, final long value) {
context.push(name, value);
return this;
}
@@ -446,7 +446,7 @@ public class UserException extends DrillRuntimeException {
* @param value context value
* @return this builder
*/
- public Builder pushContext(String name, double value) {
+ public Builder pushContext(final String name, final double value) {
context.push(name, value);
return this;
}
@@ -462,7 +462,19 @@ public class UserException extends DrillRuntimeException {
return uex;
}
- return new UserException(this);
+ final UserException newException = new UserException(this);
+
+ // since we just created a new exception, we should log it for later reference. If this is a system error, this is
+ // an issue that the Drill admin should pay attention to and we should log as ERROR. However, if this is a user
+ // mistake or data read issue, the system admin should not be concerned about these and thus we'll log this
+ // as an INFO message.
+ if (errorType == DrillPBError.ErrorType.SYSTEM) {
+ logger.error(newException.getMessage(), newException);
+ } else {
+ logger.info("User Error Occurred", newException);
+ }
+
+ return newException;
}
}
@@ -470,14 +482,14 @@ public class UserException extends DrillRuntimeException {
private final UserExceptionContext context;
- protected UserException(DrillPBError.ErrorType errorType, String message, Throwable cause) {
+ protected UserException(final DrillPBError.ErrorType errorType, final String message, final Throwable cause) {
super(message, cause);
this.errorType = errorType;
this.context = new UserExceptionContext();
}
- private UserException(Builder builder) {
+ private UserException(final Builder builder) {
super(builder.message, builder.cause);
this.errorType = builder.errorType;
this.context = builder.context;
@@ -516,10 +528,10 @@ public class UserException extends DrillRuntimeException {
* @param verbose should the error object contain the verbose error message ?
* @return protobuf error object
*/
- public DrillPBError getOrCreatePBError(boolean verbose) {
- String message = verbose ? getVerboseMessage() : getMessage();
+ public DrillPBError getOrCreatePBError(final boolean verbose) {
+ final String message = verbose ? getVerboseMessage() : getMessage();
- DrillPBError.Builder builder = DrillPBError.newBuilder();
+ final DrillPBError.Builder builder = DrillPBError.newBuilder();
builder.setErrorType(errorType);
builder.setErrorId(context.getErrorId());
if (context.getEndpoint() != null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
index 81904df..5e7a9e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/AsmUtil.java
@@ -54,7 +54,7 @@ public class AsmUtil {
final ClassReader ver = new ClassReader(verifyWriter.toByteArray());
try {
DrillCheckClassAdapter.verify(ver, false, new PrintWriter(sw));
- } catch(Exception e) {
+ } catch(final Exception e) {
logger.info("Caught exception verifying class:");
logClass(logger, logTag, classNode);
throw e;
@@ -98,19 +98,25 @@ public class AsmUtil {
/**
* Write a class to the log.
*
- * <p>Writes at level DEBUG.
+ * <p>
+ * Writes at level TRACE.
*
- * @param logger the logger to write to
- * @param logTag a tag to print to the log
- * @param classNode the class
+ * @param logger
+ * the logger to write to
+ * @param logTag
+ * a tag to print to the log
+ * @param classNode
+ * the class
*/
public static void logClass(final Logger logger, final String logTag, final ClassNode classNode) {
- logger.debug(logTag);
- final StringWriter stringWriter = new StringWriter();
- final PrintWriter printWriter = new PrintWriter(stringWriter);
- final TraceClassVisitor traceClassVisitor = new TraceClassVisitor(printWriter);
- classNode.accept(traceClassVisitor);
- logger.debug(stringWriter.toString());
+ if (logger.isTraceEnabled()) {
+ logger.trace(logTag);
+ final StringWriter stringWriter = new StringWriter();
+ final PrintWriter printWriter = new PrintWriter(stringWriter);
+ final TraceClassVisitor traceClassVisitor = new TraceClassVisitor(printWriter);
+ classNode.accept(traceClassVisitor);
+ logger.trace(stringWriter.toString());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
index 6c0292e..d25b1ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
@@ -51,12 +51,6 @@ public class InstructionModifier extends MethodVisitor {
private int stackIncrease = 0; // how much larger we have to make the stack
- /*
- * True if we've transferred holder members to locals. If so, then the maximum stack
- * size must be increased by one to accommodate the extra DUP that is used to do so.
- */
- private boolean transferredToLocals = false;
-
public InstructionModifier(final int access, final String name, final String desc,
final String signature, final String[] exceptions, final TrackingInstructionList list,
final MethodVisitor inner) {
@@ -116,7 +110,7 @@ public class InstructionModifier extends MethodVisitor {
}
@Override
- public void visitInsn(int opcode) {
+ public void visitInsn(final int opcode) {
switch (opcode) {
case Opcodes.DUP:
/*
@@ -276,7 +270,7 @@ public class InstructionModifier extends MethodVisitor {
}
@Override
- public void visitTypeInsn(int opcode, String type) {
+ public void visitTypeInsn(final int opcode, final String type) {
/*
* This includes NEW, NEWARRAY, CHECKCAST, or INSTANCEOF.
*
@@ -285,9 +279,9 @@ public class InstructionModifier extends MethodVisitor {
* replaced the values for those, but we might find other reasons to replace
* things, in which case this will be too broad.
*/
- ReplacingBasicValue r = getFunctionReturn();
+ final ReplacingBasicValue r = getFunctionReturn();
if (r != null) {
- ValueHolderSub sub = r.getIden().getHolderSub(adder);
+ final ValueHolderSub sub = r.getIden().getHolderSub(adder);
oldToNew.put(r.getIndex(), sub);
} else {
super.visitTypeInsn(opcode, type);
@@ -295,7 +289,7 @@ public class InstructionModifier extends MethodVisitor {
}
@Override
- public void visitLineNumber(int line, Label start) {
+ public void visitLineNumber(final int line, final Label start) {
lastLineNumber = line;
super.visitLineNumber(line, start);
}
@@ -347,7 +341,7 @@ public class InstructionModifier extends MethodVisitor {
@Override
public void visitMaxs(final int maxStack, final int maxLocals) {
- super.visitMaxs(maxStack + stackIncrease + (transferredToLocals ? 1 : 0), maxLocals);
+ super.visitMaxs(maxStack + stackIncrease, maxLocals);
}
@Override
@@ -397,7 +391,7 @@ public class InstructionModifier extends MethodVisitor {
}
@Override
- public void visitMethodInsn(int opcode, String owner, String name, String desc) {
+ public void visitMethodInsn(final int opcode, final String owner, final String name, final String desc) {
/*
* This method was deprecated in the switch from api version ASM4 to ASM5.
* If we ever go back (via CompilationConfig.ASM_API_VERSION), we need to
@@ -408,7 +402,7 @@ public class InstructionModifier extends MethodVisitor {
}
@Override
- public void visitMethodInsn(int opcode, String owner, String name, String desc, boolean itf) {
+ public void visitMethodInsn(final int opcode, final String owner, final String name, final String desc, final boolean itf) {
// this version of visitMethodInsn() came after ASM4
assert CompilationConfig.ASM_API_VERSION != Opcodes.ASM4;
@@ -471,7 +465,7 @@ public class InstructionModifier extends MethodVisitor {
@Override
public void visitEnd() {
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
final StringBuilder sb = new StringBuilder();
sb.append("InstructionModifier ");
sb.append(name);
@@ -488,7 +482,7 @@ public class InstructionModifier extends MethodVisitor {
int itemCount = 0; // counts up the number of items found
final HashMap<ValueHolderIden, Integer> seenIdens = new HashMap<>(); // iden -> idenId
sb.append(" .oldToNew:\n");
- for (IntObjectCursor<ValueHolderIden.ValueHolderSub> ioc : oldToNew) {
+ for (final IntObjectCursor<ValueHolderIden.ValueHolderSub> ioc : oldToNew) {
final ValueHolderIden iden = ioc.value.iden();
if (!seenIdens.containsKey(iden)) {
seenIdens.put(iden, ++idenId);
@@ -501,7 +495,7 @@ public class InstructionModifier extends MethodVisitor {
}
sb.append(" .oldLocalToFirst:\n");
- for (IntIntCursor iic : oldLocalToFirst) {
+ for (final IntIntCursor iic : oldLocalToFirst) {
sb.append(" " + iic.key + " => " + iic.value + '\n');
++itemCount;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
index 6e981bc..adbf2fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
@@ -60,14 +60,14 @@ public class ScalarReplacementNode extends MethodNode {
Frame<BasicValue>[] frames;
try {
frames = analyzer.analyze(className, this);
- } catch (AnalyzerException e) {
+ } catch (final AnalyzerException e) {
throw new IllegalStateException(e);
}
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
final StringBuilder sb = new StringBuilder();
sb.append("ReplacingBasicValues for " + className + "\n");
- for(ReplacingBasicValue value : valueList) {
+ for(final ReplacingBasicValue value : valueList) {
value.dump(sb, 2);
sb.append('\n');
}
@@ -75,14 +75,14 @@ public class ScalarReplacementNode extends MethodNode {
}
// wrap the instruction handler so that we can do additional things
- TrackingInstructionList list = new TrackingInstructionList(frames, this.instructions);
+ final TrackingInstructionList list = new TrackingInstructionList(frames, this.instructions);
this.instructions = list;
MethodVisitor methodVisitor = inner;
if (verifyBytecode) {
methodVisitor = new CheckMethodVisitorFsm(CompilationConfig.ASM_API_VERSION, methodVisitor);
}
- InstructionModifier holderV = new InstructionModifier(this.access, this.name, this.desc,
+ final InstructionModifier holderV = new InstructionModifier(this.access, this.name, this.desc,
this.signature, this.exceptionsArr, list, methodVisitor);
accept(holderV);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
index 2d22d84..c953bb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManager.java
@@ -58,6 +58,7 @@ public class BufferManager implements AutoCloseable {
((DrillBuf)mbuffers[i]).release();
}
}
+ managedBuffers.clear();
}
public DrillBuf replace(DrillBuf old, int newSize) {
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 44ca78a..c46613d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.ops;
-import com.google.common.base.Preconditions;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
@@ -27,7 +26,6 @@ import java.util.Map;
import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
-import org.apache.drill.common.DeferredException;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -53,6 +51,8 @@ import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.PartitionExplorer;
import org.apache.drill.exec.work.batch.IncomingBuffers;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
/**
@@ -72,31 +72,19 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
private IncomingBuffers buffers;
private final OptionManager fragmentOptions;
private final BufferManager bufferManager;
+ private ExecutorState executorState;
- private final DeferredException deferredException = new DeferredException();
- private volatile FragmentContextState state = FragmentContextState.OK;
private final SendingAccountor sendingAccountor = new SendingAccountor();
private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
-
@Override
- public void accept(RpcException e) {
+ public void accept(final RpcException e) {
fail(e);
}
};
+
private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
private final AccountingUserConnection accountingUserConnection;
- /*
- * TODO we need a state that indicates that cancellation has been requested and
- * is in progress. Early termination (such as from limit queries) could also use
- * this, as the cleanup steps should be exactly the same.
- */
- private static enum FragmentContextState {
- OK,
- FAILED,
- CANCELED
- }
-
public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
throws ExecutionSetupException {
@@ -111,14 +99,14 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
logger.debug("Fragment max allocation: {}", fragment.getMemMax());
try {
- OptionList list;
+ final OptionList list;
if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
list = new OptionList();
} else {
list = dbContext.getConfig().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
}
fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new ExecutionSetupException("Failure while reading plan options.", e);
}
@@ -127,8 +115,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
try {
allocator = context.getAllocator().getChildAllocator(
this, fragment.getMemInitial(), fragment.getMemMax(), true);
- assert (allocator != null);
- } catch(Throwable e) {
+ Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
+ } catch(final Throwable e) {
throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
}
@@ -140,37 +128,29 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return fragmentOptions;
}
- public void setBuffers(IncomingBuffers buffers) {
+ public void setBuffers(final IncomingBuffers buffers) {
+ Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
this.buffers = buffers;
}
- public void fail(Throwable cause) {
- final FragmentHandle fragmentHandle = fragment.getHandle();
-
- UserException dse = UserException.systemError(cause).addIdentity(getIdentity()).build();
-
- // log the error id
- logger.error("Fragment Context received failure -- Fragment: {}:{}",
- fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(), dse);
-
- setState(FragmentContextState.FAILED);
- deferredException.addThrowable(dse);
+ public void setExecutorState(final ExecutorState executorState) {
+ Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
+ this.executorState = executorState;
}
- public void cancel() {
- setState(FragmentContextState.CANCELED);
+ public void fail(final Throwable cause) {
+ executorState.fail(cause);
}
/**
- * Allowed transitions from left to right: OK -> FAILED -> CANCELED
- * @param newState
+ * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
+ * will mean that the fragment should prematurely exit execution. Long running operations should check this every so
+ * often so that Drill is responsive to cancellation operations.
+ *
+ * @return false if the action should terminate immediately, true if everything is okay.
*/
- private synchronized void setState(FragmentContextState newState) {
- if (state == FragmentContextState.OK) {
- state = newState;
- } else if (newState == FragmentContextState.CANCELED) {
- state = newState;
- }
+ public boolean shouldContinue() {
+ return executorState.shouldContinue();
}
public DrillbitContext getDrillbitContext() {
@@ -224,6 +204,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return frag;
}
+
+
/**
* Get this fragment's allocator.
* @return the allocator
@@ -252,11 +234,11 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return context.getCompiler().getImplementationClass(cg);
}
- public <T> List<T> getImplementationClass(ClassGenerator<T> cg, int instanceCount) throws ClassTransformationException, IOException {
+ public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
return getImplementationClass(cg.getCodeGenerator(), instanceCount);
}
- public <T> List<T> getImplementationClass(CodeGenerator<T> cg, int instanceCount) throws ClassTransformationException, IOException {
+ public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
return context.getCompiler().getImplementationClass(cg, instanceCount);
}
@@ -269,7 +251,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return context.getController().getTunnel(endpoint);
}
- public AccountingDataTunnel getDataTunnel(DrillbitEndpoint endpoint) {
+ public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) {
AccountingDataTunnel tunnel = tunnels.get(endpoint);
if (tunnel == null) {
tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler);
@@ -282,16 +264,16 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return buffers;
}
+ @VisibleForTesting
+ @Deprecated
public Throwable getFailureCause() {
- return deferredException.getException();
+ return executorState.getFailureCause();
}
+ @VisibleForTesting
+ @Deprecated
public boolean isFailed() {
- return state == FragmentContextState.FAILED;
- }
-
- public boolean isCancelled() {
- return state == FragmentContextState.CANCELED;
+ return executorState.isFailed();
}
public FunctionImplementationRegistry getFunctionRegistry() {
@@ -306,24 +288,25 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
allocator.setFragmentLimit(limit);
}
- public DeferredException getDeferredException() {
- return deferredException;
- }
-
@Override
- public void close() throws Exception {
- /*
- * TODO wait for threads working on this Fragment to terminate (or at least stop working
- * on this Fragment's query)
- */
- deferredException.suppressingClose(bufferManager);
- deferredException.suppressingClose(buffers);
- deferredException.suppressingClose(allocator);
+ public void close() {
+ waitForSendComplete();
+ suppressingClose(bufferManager);
+ suppressingClose(buffers);
+ suppressingClose(allocator);
+ }
- deferredException.close(); // must be last, as this may throw
+ private void suppressingClose(final AutoCloseable closeable) {
+ try {
+ if (closeable != null) {
+ closeable.close();
+ }
+ } catch (final Exception e) {
+ fail(e);
+ }
}
- public DrillBuf replace(DrillBuf old, int newSize) {
+ public DrillBuf replace(final DrillBuf old, final int newSize) {
return bufferManager.replace(old, newSize);
}
@@ -332,7 +315,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return bufferManager.getManagedBuffer();
}
- public DrillBuf getManagedBuffer(int size) {
+ public DrillBuf getManagedBuffer(final int size) {
return bufferManager.getManagedBuffer(size);
}
@@ -350,4 +333,30 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
sendingAccountor.waitForSendComplete();
}
+ public interface ExecutorState {
+ /**
+ * Whether execution should continue.
+ *
+ * @return false if execution should stop.
+ */
+ public boolean shouldContinue();
+
+ /**
+ * Inform the executor if a exception occurs and fragment should be failed.
+ *
+ * @param t
+ * The exception that occurred.
+ */
+ public void fail(final Throwable t);
+
+ @VisibleForTesting
+ @Deprecated
+ public boolean isFailed();
+
+ @VisibleForTesting
+ @Deprecated
+ public Throwable getFailureCause();
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 5b7ca66..628dcd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -34,7 +34,7 @@ public abstract class BaseRootExec implements RootExec {
protected OperatorContext oContext = null;
protected FragmentContext fragmentContext = null;
- public BaseRootExec(FragmentContext fragmentContext, PhysicalOperator config) throws OutOfMemoryException {
+ public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
this.oContext = new OperatorContext(config, fragmentContext, stats, true);
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
config.getOperatorType(), OperatorContext.getChildCount(config)),
@@ -43,7 +43,7 @@ public abstract class BaseRootExec implements RootExec {
this.fragmentContext = fragmentContext;
}
- public BaseRootExec(FragmentContext fragmentContext, OperatorContext oContext, PhysicalOperator config) throws OutOfMemoryException {
+ public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext, final PhysicalOperator config) throws OutOfMemoryException {
this.oContext = oContext;
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
config.getOperatorType(), OperatorContext.getChildCount(config)),
@@ -56,7 +56,7 @@ public abstract class BaseRootExec implements RootExec {
public final boolean next() {
// Stats should have been initialized
assert stats != null;
- if (fragmentContext.isFailed()) {
+ if (!fragmentContext.shouldContinue()) {
return false;
}
try {
@@ -67,7 +67,7 @@ public abstract class BaseRootExec implements RootExec {
}
}
- public final IterOutcome next(RecordBatch b){
+ public final IterOutcome next(final RecordBatch b){
stats.stopProcessing();
IterOutcome next;
try {
@@ -90,7 +90,7 @@ public abstract class BaseRootExec implements RootExec {
public abstract boolean innerNext();
@Override
- public void receivingFragmentFinished(FragmentHandle handle) {
+ public void receivingFragmentFinished(final FragmentHandle handle) {
logger.warn("Currently not handling FinishedFragment message");
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index e230fd2..b8ef690 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -40,7 +40,6 @@ import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
@@ -77,7 +76,6 @@ import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
import org.eigenbase.rel.RelFieldCollation.Direction;
-import org.eigenbase.rel.RelFieldCollation.NullDirection;
import parquet.Preconditions;
@@ -97,12 +95,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private static final int OUTGOING_BATCH_SIZE = 32 * 1024;
private RecordBatchLoader[] batchLoaders;
- private RawFragmentBatchProvider[] fragProviders;
- private FragmentContext context;
+ private final RawFragmentBatchProvider[] fragProviders;
+ private final FragmentContext context;
private BatchSchema schema;
private VectorContainer outgoingContainer;
private MergingReceiverGeneratorBase merger;
- private MergingReceiverPOP config;
+ private final MergingReceiverPOP config;
private boolean hasRun = false;
private boolean prevBatchWasFull = false;
private boolean hasMoreIncoming = true;
@@ -126,9 +124,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
}
- public MergingRecordBatch(FragmentContext context,
- MergingReceiverPOP config,
- RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
+ public MergingRecordBatch(final FragmentContext context,
+ final MergingReceiverPOP config,
+ final RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
super(config, context, true, new OperatorContext(config, context, false));
//super(config, context);
this.fragProviders = fragProviders;
@@ -138,10 +136,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
this.config = config;
}
- private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{
+ private RawFragmentBatch getNext(final RawFragmentBatchProvider provider) throws IOException{
stats.startWait();
try {
- RawFragmentBatch b = provider.getNext();
+ final RawFragmentBatch b = provider.getNext();
if (b != null) {
stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
@@ -177,9 +175,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
schemaChanged = true; // first iteration is always a schema change
// set up each (non-empty) incoming record batch
- List<RawFragmentBatch> rawBatches = Lists.newArrayList();
+ final List<RawFragmentBatch> rawBatches = Lists.newArrayList();
int p = 0;
- for (RawFragmentBatchProvider provider : fragProviders) {
+ for (final RawFragmentBatchProvider provider : fragProviders) {
RawFragmentBatch rawBatch = null;
try {
// check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema
@@ -190,10 +188,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
rawBatch = getNext(provider);
}
p++;
- if (rawBatch == null && context.isCancelled()) {
+ if (rawBatch == null && !context.shouldContinue()) {
return IterOutcome.STOP;
}
- } catch (IOException e) {
+ } catch (final IOException e) {
context.fail(e);
return IterOutcome.STOP;
}
@@ -208,10 +206,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
;
}
- if (rawBatch == null && context.isCancelled()) {
+ if (rawBatch == null && !context.shouldContinue()) {
return IterOutcome.STOP;
}
- } catch (IOException e) {
+ } catch (final IOException e) {
context.fail(e);
return IterOutcome.STOP;
}
@@ -234,12 +232,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
int i = 0;
- for (RawFragmentBatch batch : incomingBatches) {
+ for (final RawFragmentBatch batch : incomingBatches) {
// initialize the incoming batchLoaders
- UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
+ final UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
try {
batchLoaders[i].load(rbd, batch.getBody());
- } catch(SchemaChangeException e) {
+ } catch(final SchemaChangeException e) {
logger.error("MergingReceiver failed to load record batch from remote host. {}", e);
context.fail(e);
return IterOutcome.STOP;
@@ -250,7 +248,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
// Canonicalize each incoming batch, so that vectors are alphabetically sorted based on SchemaPath.
- for (RecordBatchLoader loader : batchLoaders) {
+ for (final RecordBatchLoader loader : batchLoaders) {
loader.canonicalize();
}
@@ -262,15 +260,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
// create the outgoing schema and vector container, and allocate the initial batch
- SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
+ final SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
int vectorCount = 0;
- for (VectorWrapper<?> v : batchLoaders[0]) {
+ for (final VectorWrapper<?> v : batchLoaders[0]) {
// add field to the output schema
bldr.addField(v.getField());
// allocate a new value vector
- ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField());
+ final ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField());
++vectorCount;
}
allocateOutgoing();
@@ -286,7 +284,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// generate code for merge operations (copy and compare)
try {
merger = createMerger();
- } catch (SchemaChangeException e) {
+ } catch (final SchemaChangeException e) {
logger.error("Failed to generate code for MergingReceiver. {}", e);
context.fail(e);
return IterOutcome.STOP;
@@ -294,9 +292,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// allocate the priority queue with the generated comparator
this.pqueue = new PriorityQueue<Node>(fragProviders.length, new Comparator<Node>() {
- public int compare(Node node1, Node node2) {
- int leftIndex = (node1.batchId << 16) + node1.valueIndex;
- int rightIndex = (node2.batchId << 16) + node2.valueIndex;
+ public int compare(final Node node1, final Node node2) {
+ final int leftIndex = (node1.batchId << 16) + node1.valueIndex;
+ final int rightIndex = (node2.batchId << 16) + node2.valueIndex;
return merger.doEval(leftIndex, rightIndex);
}
});
@@ -305,14 +303,14 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
for (int b = 0; b < senderCount; ++b) {
while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) {
try {
- RawFragmentBatch batch = getNext(fragProviders[b]);
+ final RawFragmentBatch batch = getNext(fragProviders[b]);
incomingBatches[b] = batch;
if (batch != null) {
batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
} else {
batchLoaders[b].clear();
batchLoaders[b] = null;
- if (context.isCancelled()) {
+ if (!context.shouldContinue()) {
return IterOutcome.STOP;
}
}
@@ -332,7 +330,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
while (!pqueue.isEmpty()) {
// pop next value from pq and copy to outgoing batch
- Node node = pqueue.peek();
+ final Node node = pqueue.peek();
if (!copyRecordToOutgoingBatch(node)) {
logger.debug("Outgoing vectors space is full; breaking");
prevBatchWasFull = true;
@@ -355,10 +353,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
nextBatch = getNext(fragProviders[node.batchId]);
}
- if (nextBatch == null && context.isCancelled()) {
+ if (nextBatch == null && !context.shouldContinue()) {
return IterOutcome.STOP;
}
- } catch (IOException e) {
+ } catch (final IOException e) {
context.fail(e);
return IterOutcome.STOP;
}
@@ -369,7 +367,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// batch is empty
boolean allBatchesEmpty = true;
- for (RawFragmentBatch batch : incomingBatches) {
+ for (final RawFragmentBatch batch : incomingBatches) {
// see if all batches are empty so we can return OK_* or NONE
if (batch != null) {
allBatchesEmpty = false;
@@ -387,10 +385,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
continue;
}
- UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
+ final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
try {
batchLoaders[node.batchId].load(rbd, incomingBatches[node.batchId].getBody());
- } catch(SchemaChangeException ex) {
+ } catch(final SchemaChangeException ex) {
context.fail(ex);
return IterOutcome.STOP;
}
@@ -412,7 +410,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
// set the value counts in the outgoing vectors
- for (VectorWrapper vw : outgoingContainer) {
+ for (final VectorWrapper vw : outgoingContainer) {
vw.getValueVector().getMutator().setValueCount(outgoingPosition);
}
@@ -448,19 +446,19 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
state = BatchState.DONE;
return;
}
- RawFragmentBatch batch = getNext(fragProviders[i]);
+ final RawFragmentBatch batch = getNext(fragProviders[i]);
if (batch.getHeader().getDef().getFieldCount() == 0) {
i++;
continue;
}
tempBatchHolder[i] = batch;
- for (SerializedField field : batch.getHeader().getDef().getFieldList()) {
- ValueVector v = outgoingContainer.addOrGet(MaterializedField.create(field));
+ for (final SerializedField field : batch.getHeader().getDef().getFieldList()) {
+ final ValueVector v = outgoingContainer.addOrGet(MaterializedField.create(field));
v.allocateNew();
}
break;
}
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new DrillRuntimeException(e);
}
outgoingContainer = VectorContainer.canonicalize(outgoingContainer);
@@ -473,27 +471,28 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
@Override
- public void kill(boolean sendUpstream) {
+ public void kill(final boolean sendUpstream) {
if (sendUpstream) {
informSenders();
} else {
cleanup();
- for (RawFragmentBatchProvider provider : fragProviders) {
+ for (final RawFragmentBatchProvider provider : fragProviders) {
provider.kill(context);
}
}
}
private void informSenders() {
- FragmentHandle handlePrototype = FragmentHandle.newBuilder()
+ logger.info("Informing senders of request to terminate sending.");
+ final FragmentHandle handlePrototype = FragmentHandle.newBuilder()
.setMajorFragmentId(config.getOppositeMajorFragmentId())
.setQueryId(context.getHandle().getQueryId())
.build();
- for (MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
- FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
+ for (final MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
+ final FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
.setMinorFragmentId(providingEndpoint.getId())
.build();
- FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
+ final FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
.setReceiver(context.getHandle())
.setSender(sender)
.build();
@@ -504,18 +503,18 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private class OutcomeListener implements RpcOutcomeListener<Ack> {
@Override
- public void failed(RpcException ex) {
+ public void failed(final RpcException ex) {
logger.warn("Failed to inform upstream that receiver is finished");
}
@Override
- public void success(Ack value, ByteBuf buffer) {
+ public void success(final Ack value, final ByteBuf buffer) {
// Do nothing
}
}
@Override
- protected void killIncoming(boolean sendUpstream) {
+ protected void killIncoming(final boolean sendUpstream) {
//No op
}
@@ -535,12 +534,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
@Override
- public TypedFieldId getValueVectorId(SchemaPath path) {
+ public TypedFieldId getValueVectorId(final SchemaPath path) {
return outgoingContainer.getValueVectorId(path);
}
@Override
- public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
return outgoingContainer.getValueAccessorById(clazz, ids);
}
@@ -549,10 +548,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
return WritableBatch.get(this);
}
- private boolean isSameSchemaAmongBatches(RecordBatchLoader[] batchLoaders) {
+ private boolean isSameSchemaAmongBatches(final RecordBatchLoader[] batchLoaders) {
Preconditions.checkArgument(batchLoaders.length > 0, "0 batch is not allowed!");
- BatchSchema schema = batchLoaders[0].getSchema();
+ final BatchSchema schema = batchLoaders[0].getSchema();
for (int i = 1; i < batchLoaders.length; i++) {
if (!schema.equals(batchLoaders[i].getSchema())) {
@@ -564,8 +563,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
private void allocateOutgoing() {
- for (VectorWrapper w : outgoingContainer) {
- ValueVector v = w.getValueVector();
+ for (final VectorWrapper w : outgoingContainer) {
+ final ValueVector v = w.getValueVector();
if (v instanceof FixedWidthVector) {
AllocationHelper.allocate(v, OUTGOING_BATCH_SIZE, 1);
} else {
@@ -587,12 +586,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private MergingReceiverGeneratorBase createMerger() throws SchemaChangeException {
try {
- CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
+ final CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ final ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
ExpandableHyperContainer batch = null;
boolean first = true;
- for (RecordBatchLoader loader : batchLoaders) {
+ for (final RecordBatchLoader loader : batchLoaders) {
if (first) {
batch = new ExpandableHyperContainer(loader);
first = false;
@@ -606,7 +605,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
g.setMappingSet(COPIER_MAPPING_SET);
CopyUtil.generateCopies(g, batch, true);
g.setMappingSet(MAIN_MAPPING);
- MergingReceiverGeneratorBase merger = context.getImplementationClass(cg);
+ final MergingReceiverGeneratorBase merger = context.getImplementationClass(cg);
merger.doSetup(context, batch, outgoingContainer);
return merger;
@@ -621,28 +620,28 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
public final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
- private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException {
+ private void generateComparisons(final ClassGenerator g, final VectorAccessible batch) throws SchemaChangeException {
g.setMappingSet(MAIN_MAPPING);
- for (Ordering od : popConfig.getOrderings()) {
+ for (final Ordering od : popConfig.getOrderings()) {
// first, we rewrite the evaluation stack for each side of the comparison.
- ErrorCollector collector = new ErrorCollectorImpl();
+ final ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
if (collector.hasErrors()) {
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
}
g.setMappingSet(LEFT_MAPPING);
- HoldingContainer left = g.addExpr(expr, false);
+ final HoldingContainer left = g.addExpr(expr, false);
g.setMappingSet(RIGHT_MAPPING);
- HoldingContainer right = g.addExpr(expr, false);
+ final HoldingContainer right = g.addExpr(expr, false);
g.setMappingSet(MAIN_MAPPING);
// next we wrap the two comparison sides and add the expression block for the comparison.
- LogicalExpression fh =
+ final LogicalExpression fh =
FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
context.getFunctionRegistry());
- HoldingContainer out = g.addExpr(fh, false);
- JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
+ final HoldingContainer out = g.addExpr(fh, false);
+ final JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
if (od.getDirection() == Direction.ASCENDING) {
jc._then()._return(out.getValue());
@@ -660,8 +659,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
*
* @param node Reference to the next record to copy from the incoming batches
*/
- private boolean copyRecordToOutgoingBatch(Node node) {
- int inIndex = (node.batchId << 16) + node.valueIndex;
+ private boolean copyRecordToOutgoingBatch(final Node node) {
+ final int inIndex = (node.batchId << 16) + node.valueIndex;
merger.doCopy(inIndex, outgoingPosition);
outgoingPosition++;
if (outgoingPosition == OUTGOING_BATCH_SIZE) {
@@ -677,7 +676,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
public class Node {
public int batchId; // incoming batch
public int valueIndex; // value within the batch
- Node(int batchId, int valueIndex) {
+ Node(final int batchId, final int valueIndex) {
this.batchId = batchId;
this.valueIndex = valueIndex;
}
@@ -687,7 +686,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
public void cleanup() {
outgoingContainer.clear();
if (batchLoaders != null) {
- for (RecordBatchLoader rbl : batchLoaders) {
+ for (final RecordBatchLoader rbl : batchLoaders) {
if (rbl != null) {
rbl.clear();
}
@@ -695,7 +694,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
oContext.close();
if (fragProviders != null) {
- for (RawFragmentBatchProvider f : fragProviders) {
+ for (final RawFragmentBatchProvider f : fragProviders) {
f.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index c50cb8a..c2d6166 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -20,10 +20,8 @@ package org.apache.drill.exec.physical.impl.producer;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
@@ -42,16 +40,16 @@ import org.apache.drill.exec.vector.ValueVector;
public class ProducerConsumerBatch extends AbstractRecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProducerConsumerBatch.class);
- private RecordBatch incoming;
- private Thread producer = new Thread(new Producer(), Thread.currentThread().getName() + " - Producer Thread");
+ private final RecordBatch incoming;
+ private final Thread producer = new Thread(new Producer(), Thread.currentThread().getName() + " - Producer Thread");
private boolean running = false;
- private BlockingDeque<RecordBatchDataWrapper> queue;
+ private final BlockingDeque<RecordBatchDataWrapper> queue;
private int recordCount;
private BatchSchema schema;
private boolean stop = false;
private final CountDownLatch cleanUpLatch = new CountDownLatch(1); // used to wait producer to clean up
- protected ProducerConsumerBatch(ProducerConsumer popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+ protected ProducerConsumerBatch(final ProducerConsumer popConfig, final FragmentContext context, final RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
this.incoming = incoming;
this.queue = new LinkedBlockingDeque<>(popConfig.getSize());
@@ -68,8 +66,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
stats.startWait();
wrapper = queue.take();
logger.debug("Got batch from queue");
- } catch (InterruptedException e) {
- if (!(context.isCancelled() || context.isFailed())) {
+ } catch (final InterruptedException e) {
+ if (!context.shouldContinue()) {
context.fail(e);
}
return IterOutcome.STOP;
@@ -84,30 +82,30 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
}
recordCount = wrapper.batch.getRecordCount();
- boolean newSchema = load(wrapper.batch);
+ final boolean newSchema = load(wrapper.batch);
return newSchema ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
}
- private boolean load(RecordBatchData batch) {
- VectorContainer newContainer = batch.getContainer();
+ private boolean load(final RecordBatchData batch) {
+ final VectorContainer newContainer = batch.getContainer();
if (schema != null && newContainer.getSchema().equals(schema)) {
container.zeroVectors();
- BatchSchema schema = container.getSchema();
+ final BatchSchema schema = container.getSchema();
for (int i = 0; i < container.getNumberOfColumns(); i++) {
- MaterializedField field = schema.getColumn(i);
- MajorType type = field.getType();
- ValueVector vOut = container.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
+ final MaterializedField field = schema.getColumn(i);
+ final MajorType type = field.getType();
+ final ValueVector vOut = container.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
container.getValueVectorId(field.getPath()).getFieldIds()).getValueVector();
- ValueVector vIn = newContainer.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
+ final ValueVector vIn = newContainer.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()),
newContainer.getValueVectorId(field.getPath()).getFieldIds()).getValueVector();
- TransferPair tp = vIn.makeTransferPair(vOut);
+ final TransferPair tp = vIn.makeTransferPair(vOut);
tp.transfer();
}
return false;
} else {
container.clear();
- for (VectorWrapper w : newContainer) {
+ for (final VectorWrapper w : newContainer) {
container.add(w.getValueVector());
}
container.buildSchema(SelectionVectorMode.NONE);
@@ -128,7 +126,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
}
outer:
while (true) {
- IterOutcome upstream = incoming.next();
+ final IterOutcome upstream = incoming.next();
switch (upstream) {
case NONE:
stop = true;
@@ -146,7 +144,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
throw new UnsupportedOperationException();
}
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
logger.warn("Producer thread is interrupted.", e);
// TODO InterruptedException
} finally {
@@ -154,7 +152,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
try {
clearQueue();
queue.put(new RecordBatchDataWrapper(null, true, false));
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
logger.error("Unable to enqueue the last batch indicator. Something is broken.", e);
// TODO InterruptedException
}
@@ -177,12 +175,12 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
}
@Override
- protected void killIncoming(boolean sendUpstream) {
+ protected void killIncoming(final boolean sendUpstream) {
stop = true;
producer.interrupt();
try {
producer.join();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for producer thread");
// TODO InterruptedException
}
@@ -193,7 +191,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
stop = true;
try {
cleanUpLatch.await();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e);
// TODO InterruptedException
} finally {
@@ -213,7 +211,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
boolean finished;
boolean failed;
- RecordBatchDataWrapper(RecordBatchData batch, boolean finished, boolean failed) {
+ RecordBatchDataWrapper(final RecordBatchData batch, final boolean finished, final boolean failed) {
this.batch = batch;
this.finished = finished;
this.failed = failed;