You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/05/05 13:27:29 UTC
[5/5] drill git commit: DRILL-5423: Refactor ScanBatch to allow unit
testing record readers
DRILL-5423: Refactor ScanBatch to allow unit testing record readers
Refactors ScanBatch to allow unit testing of record reader
implementations, especially the “writer” classes.
See JIRA for details.
closes #811
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/41ffed50
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/41ffed50
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/41ffed50
Branch: refs/heads/master
Commit: 41ffed50fbb9319b4a796555396b88de010cb10b
Parents: 0939485
Author: Paul Rogers <pr...@maprtech.com>
Authored: Sat Apr 8 20:52:04 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Fri May 5 15:46:01 2017 +0300
----------------------------------------------------------------------
.../exec/ops/AbstractOperatorExecContext.java | 90 ++++++++++++++++
.../apache/drill/exec/ops/OperatorContext.java | 51 ++--------
.../drill/exec/ops/OperatorContextImpl.java | 102 ++++++-------------
.../drill/exec/ops/OperatorExecContext.java | 46 +++++++++
.../drill/exec/ops/OperatorUtilities.java | 48 +++++++++
.../drill/exec/physical/impl/BaseRootExec.java | 5 +-
.../drill/exec/physical/impl/ScanBatch.java | 62 ++++++++---
.../drill/exec/memory/TestAllocators.java | 7 +-
.../drill/exec/record/TestRecordIterator.java | 5 +-
9 files changed, 280 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
new file mode 100644
index 0000000..a517fdf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.testing.ExecutionControls;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Implementation of {@link OperatorExecContext} that provides services
+ * needed by most run-time operators. Excludes services that need the
+ * entire Drillbit. Allows easy testing of operator code that uses this
+ * interface.
+ */
+
+public class AbstractOperatorExecContext implements OperatorExecContext {
+
+ protected final BufferAllocator allocator;
+ protected final ExecutionControls executionControls;
+ protected final PhysicalOperator popConfig;
+ protected final BufferManager manager;
+ protected final OperatorStatReceiver statsWriter;
+
+ public AbstractOperatorExecContext(BufferAllocator allocator, PhysicalOperator popConfig,
+ ExecutionControls executionControls,
+ OperatorStatReceiver stats) {
+ this.allocator = allocator;
+ this.popConfig = popConfig;
+ manager = new BufferManagerImpl(allocator);
+ statsWriter = stats;
+ this.executionControls = executionControls;
+ }
+
+ @Override
+ public DrillBuf replace(DrillBuf old, int newSize) {
+ return manager.replace(old, newSize);
+ }
+
+ @Override
+ public DrillBuf getManagedBuffer() {
+ return manager.getManagedBuffer();
+ }
+
+ @Override
+ public DrillBuf getManagedBuffer(int size) {
+ return manager.getManagedBuffer(size);
+ }
+
+ @Override
+ public ExecutionControls getExecutionControls() { return executionControls; }
+
+ @Override
+ public OperatorStatReceiver getStatsWriter() { return statsWriter; }
+
+ @Override
+ public BufferAllocator getAllocator() {
+ if (allocator == null) {
+ throw new UnsupportedOperationException("Operator context does not have an allocator");
+ }
+ return allocator;
+ }
+
+ @Override
+ public void close() {
+ try {
+ manager.close();
+ } finally {
+ if (allocator != null) {
+ allocator.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index d6045fc..b248d5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,44 +18,28 @@
package org.apache.drill.exec.ops;
import java.io.IOException;
-import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.testing.ExecutionControls;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.buffer.DrillBuf;
+public interface OperatorContext extends OperatorExecContext {
-public abstract class OperatorContext {
+ OperatorStats getStats();
- public abstract DrillBuf replace(DrillBuf old, int newSize);
+ ExecutorService getExecutor();
- public abstract DrillBuf getManagedBuffer();
+ ExecutorService getScanExecutor();
- public abstract DrillBuf getManagedBuffer(int size);
+ ExecutorService getScanDecodeExecutor();
- public abstract BufferAllocator getAllocator();
+ DrillFileSystem newFileSystem(Configuration conf) throws IOException;
- public abstract OperatorStats getStats();
-
- public abstract ExecutorService getExecutor();
-
- public abstract ExecutorService getScanExecutor();
-
- public abstract ExecutorService getScanDecodeExecutor();
-
- public abstract ExecutionControls getExecutionControls();
-
- public abstract DrillFileSystem newFileSystem(Configuration conf) throws IOException;
-
- public abstract DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException;
+ DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException;
/**
* Run the callable as the given proxy user.
@@ -65,21 +49,6 @@ public abstract class OperatorContext {
* @param <RESULT> result type
* @return Future<RESULT> future with the result of calling the callable
*/
- public abstract <RESULT> ListenableFuture<RESULT> runCallableAs(UserGroupInformation proxyUgi,
+ <RESULT> ListenableFuture<RESULT> runCallableAs(UserGroupInformation proxyUgi,
Callable<RESULT> callable);
-
- public static int getChildCount(PhysicalOperator popConfig) {
- Iterator<PhysicalOperator> iter = popConfig.iterator();
- int i = 0;
- while (iter.hasNext()) {
- iter.next();
- i++;
- }
-
- if (i == 0) {
- i = 1;
- }
- return i;
- }
-
-}
+ }
http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index c19cc1f..37c609e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.ops;
-import io.netty.buffer.DrillBuf;
-
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
@@ -26,10 +24,8 @@ import java.util.concurrent.ExecutorService;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.testing.ExecutionControls;
import org.apache.drill.exec.work.WorkManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
@@ -39,15 +35,11 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-class OperatorContextImpl extends OperatorContext implements AutoCloseable {
+class OperatorContextImpl extends AbstractOperatorExecContext implements OperatorContext, AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
- private final BufferAllocator allocator;
- private final ExecutionControls executionControls;
private boolean closed = false;
- private final PhysicalOperator popConfig;
private final OperatorStats stats;
- private final BufferManager manager;
private DrillFileSystem fs;
private final ExecutorService executor;
private final ExecutorService scanExecutor;
@@ -62,75 +54,43 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
private ListeningExecutorService delegatePool;
public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException {
- this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
- popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
- this.popConfig = popConfig;
- this.manager = new BufferManagerImpl(allocator);
-
- OpProfileDef def =
- new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
- stats = context.getStats().newOperatorStats(def, allocator);
- executionControls = context.getExecutionControls();
- executor = context.getDrillbitContext().getExecutor();
- scanExecutor = context.getDrillbitContext().getScanExecutor();
- scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
+ this(popConfig, context, null);
}
public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats)
throws OutOfMemoryException {
- this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
- popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
- this.popConfig = popConfig;
- this.manager = new BufferManagerImpl(allocator);
- this.stats = stats;
- executionControls = context.getExecutionControls();
+ super(context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
+ popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation()),
+ popConfig, context.getExecutionControls(), stats);
+ if (stats != null) {
+ this.stats = stats;
+ } else {
+ OpProfileDef def =
+ new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(),
+ OperatorUtilities.getChildCount(popConfig));
+ this.stats = context.getStats().newOperatorStats(def, allocator);
+ }
executor = context.getDrillbitContext().getExecutor();
scanExecutor = context.getDrillbitContext().getScanExecutor();
scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
}
- @Override
- public DrillBuf replace(DrillBuf old, int newSize) {
- return manager.replace(old, newSize);
- }
-
- @Override
- public DrillBuf getManagedBuffer() {
- return manager.getManagedBuffer();
- }
-
- @Override
- public DrillBuf getManagedBuffer(int size) {
- return manager.getManagedBuffer(size);
- }
-
// Allow an operator to use the thread pool
@Override
public ExecutorService getExecutor() {
return executor;
}
+
@Override
public ExecutorService getScanExecutor() {
return scanExecutor;
}
+
@Override
public ExecutorService getScanDecodeExecutor() {
return scanDecodeExecutor;
}
- @Override
- public ExecutionControls getExecutionControls() {
- return executionControls;
- }
-
- @Override
- public BufferAllocator getAllocator() {
- if (allocator == null) {
- throw new UnsupportedOperationException("Operator context does not have an allocator");
- }
- return allocator;
- }
-
public boolean isClosed() {
return closed;
}
@@ -143,20 +103,19 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
}
logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null);
- manager.close();
-
- if (allocator != null) {
- allocator.close();
- }
-
- if (fs != null) {
- try {
- fs.close();
- } catch (IOException e) {
- throw new DrillRuntimeException(e);
+ closed = true;
+ try {
+ super.close();
+ } finally {
+ if (fs != null) {
+ try {
+ fs.close();
+ fs = null;
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
}
}
- closed = true;
}
@Override
@@ -201,14 +160,13 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
return fs;
}
- @Override
- /*
- Creates a DrillFileSystem that does not automatically track operator stats.
+ /**
+ * Creates a DrillFileSystem that does not automatically track operator stats.
*/
+ @Override
public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException {
Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
fs = new DrillFileSystem(conf, null);
return fs;
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
new file mode 100644
index 0000000..4d64aba
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.testing.ExecutionControls;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Narrowed version of the {@link OperatorContext} used to create an
+ * easy-to-test version of the operator context that excludes services
+ * that require a full Drillbit server.
+ */
+
+public interface OperatorExecContext {
+
+ DrillBuf replace(DrillBuf old, int newSize);
+
+ DrillBuf getManagedBuffer();
+
+ DrillBuf getManagedBuffer(int size);
+
+ BufferAllocator getAllocator();
+
+ ExecutionControls getExecutionControls();
+
+ OperatorStatReceiver getStatsWriter();
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java
new file mode 100644
index 0000000..2e6e759
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+/**
+ * Utility methods, formerly on the OperatorContext class, that work with
+ * operators. The utilities here are available to operators at unit test
+ * time, while methods in OperatorContext are available only in production
+ * code.
+ */
+
+public class OperatorUtilities {
+
+ private OperatorUtilities() { }
+
+ public static int getChildCount(PhysicalOperator popConfig) {
+ Iterator<PhysicalOperator> iter = popConfig.iterator();
+ int count = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ count++;
+ }
+
+ if (count == 0) {
+ count = 1;
+ }
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index f720f8e..d01e294 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OpProfileDef;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.OperatorUtilities;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.CloseableRecordBatch;
@@ -44,7 +45,7 @@ public abstract class BaseRootExec implements RootExec {
public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
this.oContext = fragmentContext.newOperatorContext(config, stats);
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
- config.getOperatorType(), OperatorContext.getChildCount(config)),
+ config.getOperatorType(), OperatorUtilities.getChildCount(config)),
oContext.getAllocator());
fragmentContext.getStats().addOperatorStats(this.stats);
this.fragmentContext = fragmentContext;
@@ -54,7 +55,7 @@ public abstract class BaseRootExec implements RootExec {
final PhysicalOperator config) throws OutOfMemoryException {
this.oContext = oContext;
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
- config.getOperatorType(), OperatorContext.getChildCount(config)),
+ config.getOperatorType(), OperatorUtilities.getChildCount(config)),
oContext.getAllocator());
fragmentContext.getStats().addOperatorStats(this.stats);
this.fragmentContext = fragmentContext;
http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 011e751..5a9af39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -35,6 +35,7 @@ import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorExecContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -56,6 +57,7 @@ import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.common.map.CaseInsensitiveMap;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
/**
@@ -68,19 +70,14 @@ public class ScanBatch implements CloseableRecordBatch {
/** Main collection of fields' value vectors. */
private final VectorContainer container = new VectorContainer();
- /** Fields' value vectors indexed by fields' keys. */
- private final CaseInsensitiveMap<ValueVector> fieldVectorMap =
- CaseInsensitiveMap.newHashMap();
-
private int recordCount;
private final FragmentContext context;
private final OperatorContext oContext;
private Iterator<RecordReader> readers;
private RecordReader currentReader;
private BatchSchema schema;
- private final Mutator mutator = new Mutator();
+ private final Mutator mutator;
private boolean done = false;
- private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private boolean hasReadNonEmptyFile = false;
private Map<String, ValueVector> implicitVectors;
private Iterator<Map<String, String>> implicitColumns;
@@ -98,6 +95,7 @@ public class ScanBatch implements CloseableRecordBatch {
currentReader = readers.next();
this.oContext = oContext;
allocator = oContext.getAllocator();
+ mutator = new Mutator(oContext, allocator, container);
boolean setup = false;
try {
@@ -158,7 +156,7 @@ public class ScanBatch implements CloseableRecordBatch {
}
private void clearFieldVectorMap() {
- for (final ValueVector v : fieldVectorMap.values()) {
+ for (final ValueVector v : mutator.fieldVectorMap().values()) {
v.clear();
}
}
@@ -173,7 +171,7 @@ public class ScanBatch implements CloseableRecordBatch {
try {
injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
- currentReader.allocate(fieldVectorMap);
+ currentReader.allocate(mutator.fieldVectorMap());
} catch (OutOfMemoryException e) {
logger.debug("Caught Out of Memory Exception", e);
clearFieldVectorMap();
@@ -204,10 +202,8 @@ public class ScanBatch implements CloseableRecordBatch {
// If all the files we have read so far are just empty, the schema is not useful
if (! hasReadNonEmptyFile) {
container.clear();
- for (ValueVector v : fieldVectorMap.values()) {
- v.clear();
- }
- fieldVectorMap.clear();
+ clearFieldVectorMap();
+ mutator.clear();
}
currentReader.close();
@@ -215,7 +211,7 @@ public class ScanBatch implements CloseableRecordBatch {
implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
currentReader.setup(oContext, mutator);
try {
- currentReader.allocate(fieldVectorMap);
+ currentReader.allocate(mutator.fieldVectorMap());
} catch (OutOfMemoryException e) {
logger.debug("Caught OutOfMemoryException");
clearFieldVectorMap();
@@ -323,11 +319,41 @@ public class ScanBatch implements CloseableRecordBatch {
return container.getValueAccessorById(clazz, ids);
}
- private class Mutator implements OutputMutator {
+ /**
+ * Row set mutator implementation provided to record readers created by
+ * this scan batch. Made visible so that tests can create this mutator
+ * without also needing a ScanBatch instance. (This class is really independent
+ * of the ScanBatch, but resides here for historical reasons. This is,
+ * in turn, the only use of the genereated vector readers in the vector
+ * package.)
+ */
+
+ @VisibleForTesting
+ public static class Mutator implements OutputMutator {
/** Whether schema has changed since last inquiry (via #isNewSchema}). Is
* true before first inquiry. */
private boolean schemaChanged = true;
+ /** Fields' value vectors indexed by fields' keys. */
+ private final CaseInsensitiveMap<ValueVector> fieldVectorMap =
+ CaseInsensitiveMap.newHashMap();
+
+ private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+ private final BufferAllocator allocator;
+
+ private final VectorContainer container;
+
+ private final OperatorExecContext oContext;
+
+ public Mutator(OperatorExecContext oContext, BufferAllocator allocator, VectorContainer container) {
+ this.oContext = oContext;
+ this.allocator = allocator;
+ this.container = container;
+ }
+
+ public Map<String, ValueVector> fieldVectorMap() {
+ return fieldVectorMap;
+ }
@SuppressWarnings("resource")
@Override
@@ -396,6 +422,10 @@ public class ScanBatch implements CloseableRecordBatch {
public CallBack getCallBack() {
return callBack;
}
+
+ public void clear() {
+ fieldVectorMap.clear();
+ }
}
@Override
@@ -414,7 +444,7 @@ public class ScanBatch implements CloseableRecordBatch {
for (final ValueVector v : implicitVectors.values()) {
v.clear();
}
- fieldVectorMap.clear();
+ mutator.clear();
currentReader.close();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
index 288e78d..0dc2925 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OpProfileDef;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.OperatorUtilities;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -218,7 +219,7 @@ public class TestAllocators extends DrillTest {
// Use some bogus operator type to create a new operator context.
def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
- OperatorContext.getChildCount(physicalOperator1));
+ OperatorUtilities.getChildCount(physicalOperator1));
stats = fragmentContext1.getStats().newOperatorStats(def, fragmentContext1.getAllocator());
// Add a couple of Operator Contexts
@@ -232,7 +233,7 @@ public class TestAllocators extends DrillTest {
OperatorContext oContext21 = fragmentContext1.newOperatorContext(physicalOperator3);
def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE,
- OperatorContext.getChildCount(physicalOperator4));
+ OperatorUtilities.getChildCount(physicalOperator4));
stats = fragmentContext2.getStats().newOperatorStats(def, fragmentContext2.getAllocator());
OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats);
DrillBuf b22 = oContext22.getAllocator().buffer(2000000);
@@ -246,7 +247,7 @@ public class TestAllocators extends DrillTest {
// New fragment starts an operator that allocates an amount within the limit
def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE,
- OperatorContext.getChildCount(physicalOperator5));
+ OperatorUtilities.getChildCount(physicalOperator5));
stats = fragmentContext3.getStats().newOperatorStats(def, fragmentContext3.getAllocator());
OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats);
http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
index c2429b7..847caa5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OpProfileDef;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.OperatorUtilities;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -73,7 +74,7 @@ public class TestRecordIterator extends PopUnitTestBase {
RecordBatch singleBatch = exec.getIncoming();
PhysicalOperator dummyPop = operatorList.iterator().next();
OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
- OperatorContext.getChildCount(dummyPop));
+ OperatorUtilities.getChildCount(dummyPop));
OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator());
RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0, false);
int totalRecords = 0;
@@ -130,7 +131,7 @@ public class TestRecordIterator extends PopUnitTestBase {
RecordBatch singleBatch = exec.getIncoming();
PhysicalOperator dummyPop = operatorList.iterator().next();
OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
- OperatorContext.getChildCount(dummyPop));
+ OperatorUtilities.getChildCount(dummyPop));
OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator());
RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0);
List<ValueVector> vectors = null;