You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:34 UTC
[13/53] [abbrv] Update typing system. Update RPC system. Add
Fragmenting Implementation. Working single node. Distributed failing due to
threading issues.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
new file mode 100644
index 0000000..fc03a23
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -0,0 +1,163 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner.fragment;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.work.QueryWorkUnit;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class SimpleParallelizer {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class);
+
+ private final Materializer materializer = new Materializer();
+
+ /**
+ * Generate a set of assigned fragments based on the provided planningSet. Do not allow parallelization stages to go
+ * beyond the global max width.
+ *
+ * @param context
+ * The current QueryContext.
+ * @param planningSet
+ * The set of queries with collected statistics that we'll work with.
+ * @param globalMaxWidth
+ * The maximum level or paralellization any stage of the query can do. Note that while this might be the
+ * number of active Drillbits, realistically, this could be well beyond that number of we want to do things
+ * like speed results return.
+ * @return The list of generatoe PlanFragment protobuf objects to be assigned out to the individual nodes.
+ * @throws FragmentSetupException
+ */
+ public QueryWorkUnit getFragments(DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
+ int globalMaxWidth) throws ExecutionSetupException {
+ assignEndpoints(activeEndpoints, planningSet, globalMaxWidth);
+ return generateWorkUnit(foremanNode, queryId, reader, rootNode, planningSet);
+ }
+
+ private QueryWorkUnit generateWorkUnit(DrillbitEndpoint foremanNode, QueryId queryId, PhysicalPlanReader reader, Fragment rootNode,
+ PlanningSet planningSet) throws ExecutionSetupException {
+
+ List<PlanFragment> fragments = Lists.newArrayList();
+
+ PlanFragment rootFragment = null;
+ FragmentRoot rootOperator = null;
+
+ // now we generate all the individual plan fragments and associated assignments. Note, we need all endpoints
+ // assigned before we can materialize, so we start a new loop here rather than utilizing the previous one.
+ for (Wrapper wrapper : planningSet) {
+ Fragment node = wrapper.getNode();
+ Stats stats = node.getStats();
+ final PhysicalOperator physicalOperatorRoot = node.getRoot();
+ boolean isRootNode = rootNode == node;
+
+ if (isRootNode && wrapper.getWidth() != 1)
+ throw new FragmentSetupException(
+ String.format(
+ "Failure while trying to setup fragment. The root fragment must always have parallelization one. In the current case, the width was set to %d.",
+ wrapper.getWidth()));
+ // a fragment is self driven if it doesn't rely on any other exchanges.
+ boolean isLeafFragment = node.getReceivingExchangePairs().size() == 0;
+
+ // Create a minorFragment for each major fragment.
+ for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) {
+ IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper);
+ PhysicalOperator op = physicalOperatorRoot.accept(materializer, iNode);
+ Preconditions.checkArgument(op instanceof FragmentRoot);
+ FragmentRoot root = (FragmentRoot) op;
+
+ // get plan as JSON
+ String plan;
+ try {
+ plan = reader.writeJson(root);
+ } catch (JsonProcessingException e) {
+ throw new FragmentSetupException("Failure while trying to convert fragment into json.", e);
+ }
+
+ FragmentHandle handle = FragmentHandle //
+ .newBuilder() //
+ .setMajorFragmentId(wrapper.getMajorFragmentId()) //
+ .setMinorFragmentId(minorFragmentId) //
+ .setQueryId(queryId) //
+ .build();
+ PlanFragment fragment = PlanFragment.newBuilder() //
+ .setCpuCost(stats.getCpuCost()) //
+ .setDiskCost(stats.getDiskCost()) //
+ .setForeman(foremanNode) //
+ .setMemoryCost(stats.getMemoryCost()) //
+ .setNetworkCost(stats.getNetworkCost()) //
+ .setFragmentJson(plan) //
+ .setHandle(handle) //
+ .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) //
+ .setLeafFragment(isLeafFragment) //
+ .build();
+
+ if (isRootNode) {
+ rootFragment = fragment;
+ rootOperator = root;
+ } else {
+ fragments.add(fragment);
+ }
+ }
+ }
+
+ return new QueryWorkUnit(rootOperator, rootFragment, fragments);
+
+ }
+
+ private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, PlanningSet planningSet,
+ int globalMaxWidth) throws PhysicalOperatorSetupException {
+ // First we determine the amount of parallelization for a fragment. This will be between 1 and maxWidth based on
+ // cost. (Later could also be based on cluster operation.) then we decide endpoints based on affinity (later this
+ // could be based on endpoint load)
+ for (Wrapper wrapper : planningSet) {
+
+ Stats stats = wrapper.getStats();
+
+ // figure out width.
+ int width = Math.min(stats.getMaxWidth(), globalMaxWidth);
+ float diskCost = stats.getDiskCost();
+ logger.debug("Frag max width: {} and diskCost: {}", stats.getMaxWidth(), diskCost);
+
+ // TODO: right now we'll just assume that each task is cost 1 so we'll set the breadth at the lesser of the number
+ // of tasks or the maximum width of the fragment.
+ if (diskCost < width) {
+ width = (int) diskCost;
+ }
+
+ if (width < 1) width = 1;
+ logger.debug("Setting width {} on fragment {}", width, wrapper);
+ wrapper.setWidth(width);
+ // figure out endpoint assignments. also informs the exchanges about their respective endpoints.
+ wrapper.assignEndpoints(allNodes);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
new file mode 100644
index 0000000..729b2f1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
@@ -0,0 +1,70 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.drill.exec.physical.OperatorCost;
+
+public class Stats {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Stats.class);
+
+ private int maxWidth = Integer.MAX_VALUE;
+ private float networkCost;
+ private float diskCost;
+ private float memoryCost;
+ private float cpuCost;
+
+ public void addMaxWidth(int maxWidth){
+ this.maxWidth = Math.min(this.maxWidth, maxWidth);
+ }
+
+ public void addCost(OperatorCost cost){
+ networkCost += cost.getNetwork();
+ diskCost += cost.getDisk();
+ memoryCost += cost.getMemory();
+ cpuCost += cost.getCpu();
+ }
+
+ public int getMaxWidth() {
+ return maxWidth;
+ }
+
+ public float getNetworkCost() {
+ return networkCost;
+ }
+
+ public float getDiskCost() {
+ return diskCost;
+ }
+
+ public float getMemoryCost() {
+ return memoryCost;
+ }
+
+ public float getCpuCost() {
+ return cpuCost;
+ }
+
+ @Override
+ public String toString() {
+ return "FragmentStats [maxWidth=" + maxWidth + ", networkCost=" + networkCost + ", diskCost=" + diskCost
+ + ", memoryCost=" + memoryCost + ", cpuCost=" + cpuCost + "]";
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
new file mode 100644
index 0000000..d53a78c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -0,0 +1,106 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.HasAffinity;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
+import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
+
+import com.google.common.base.Preconditions;
+
+public class StatsCollector {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatsCollector.class);
+
+ private final static OpStatsCollector opStatCollector = new OpStatsCollector();
+
+ private StatsCollector() {
+ };
+
+ private static void visit(PlanningSet planningSet, Fragment n) {
+ Preconditions.checkNotNull(planningSet);
+ Preconditions.checkNotNull(n);
+
+ Wrapper wrapper = planningSet.get(n);
+ n.getRoot().accept(opStatCollector, wrapper);
+ logger.debug("Set stats to {}", wrapper.getStats());
+ // receivers...
+ for (ExchangeFragmentPair child : n) {
+ // get the fragment node that feeds this node.
+ Fragment childNode = child.getNode();
+ visit(planningSet, childNode);
+ }
+
+ }
+
+ public static PlanningSet collectStats(Fragment rootFragment) {
+ PlanningSet fps = new PlanningSet();
+ visit(fps, rootFragment);
+ return fps;
+ }
+
+ private static class OpStatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeException> {
+
+ @Override
+ public Void visitSendingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException {
+ Stats stats = wrapper.getStats();
+ stats.addCost(exchange.getAggregateSendCost());
+ stats.addMaxWidth(exchange.getMaxSendWidth());
+ return super.visitSendingExchange(exchange, wrapper);
+ }
+
+ @Override
+ public Void visitReceivingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException {
+ wrapper.getStats().addCost(exchange.getAggregateReceiveCost());
+ // no traversal since it would cross fragment boundary.
+ return null;
+ }
+
+ @Override
+ public Void visitScan(Scan<?> scan, Wrapper wrapper) {
+ Stats stats = wrapper.getStats();
+ stats.addMaxWidth(scan.getReadEntries().size());
+ return super.visitScan(scan, wrapper);
+ }
+
+ @Override
+ public Void visitStore(Store store, Wrapper wrapper) {
+ Stats stats = wrapper.getStats();
+ stats.addMaxWidth(store.getMaxWidth());
+ return super.visitStore(store, wrapper);
+ }
+
+ @Override
+ public Void visitOp(PhysicalOperator op, Wrapper wrapper) {
+ if(op instanceof HasAffinity){
+ wrapper.addEndpointAffinity(((HasAffinity)op).getOperatorAffinity());
+ }
+ Stats stats = wrapper.getStats();
+ stats.addCost(op.getCost());
+ for (PhysicalOperator child : op) {
+ child.accept(this, wrapper);
+ }
+ return null;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
new file mode 100644
index 0000000..0dfcb62
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -0,0 +1,186 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner.fragment;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * A wrapping class that allows us to add additional information to each fragment node for planning purposes.
+ */
+public class Wrapper {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Wrapper.class);
+
+ private final Fragment node;
+ private final int majorFragmentId;
+ private int width = -1;
+ private final Stats stats;
+ private boolean endpointsAssigned;
+ private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinity = Maps.newHashMap();
+
+ // a list of assigned endpoints. Technically, there could repeated endpoints in this list if we'd like to assign the
+ // same fragment multiple times to the same endpoint.
+ private List<DrillbitEndpoint> endpoints = Lists.newLinkedList();
+
+ public Wrapper(Fragment node, int majorFragmentId) {
+ this.majorFragmentId = majorFragmentId;
+ this.node = node;
+ this.stats = new Stats();
+ }
+
+ public Stats getStats() {
+ return stats;
+ }
+
+ public void addEndpointAffinity(List<EndpointAffinity> affinities){
+ Preconditions.checkState(!endpointsAssigned);
+ for(EndpointAffinity ea : affinities){
+ addEndpointAffinity(ea.getEndpoint(), ea.getAffinity());
+ }
+ }
+
+ public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
+ Preconditions.checkState(!endpointsAssigned);
+ EndpointAffinity ea = endpointAffinity.get(endpoint);
+ if (ea == null) {
+ ea = new EndpointAffinity(endpoint);
+ endpointAffinity.put(endpoint, ea);
+ }
+
+ ea.addAffinity(affinity);
+ }
+
+ public int getMajorFragmentId() {
+ return majorFragmentId;
+ }
+
+ public int getWidth() {
+ return width;
+ }
+
+ public void setWidth(int width) {
+ Preconditions.checkState(this.width == -1);
+ this.width = width;
+ }
+
+ public Fragment getNode() {
+ return node;
+ }
+
+ private class AssignEndpointsToScanAndStore extends AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, PhysicalOperatorSetupException>{
+
+
+ @Override
+ public Void visitExchange(Exchange exchange, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
+ if(exchange == node.getSendingExchange()){
+ return visitOp(exchange, value);
+ }
+ // stop on receiver exchange.
+ return null;
+ }
+
+ @Override
+ public Void visitScan(Scan<?> scan, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
+ scan.applyAssignments(value);
+ return super.visitScan(scan, value);
+ }
+
+ @Override
+ public Void visitStore(Store store, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
+ store.applyAssignments(value);
+ return super.visitStore(store, value);
+ }
+
+ @Override
+ public Void visitOp(PhysicalOperator op, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException {
+ return visitChildren(op, value);
+ }
+
+ }
+
+ public void assignEndpoints(Collection<DrillbitEndpoint> allPossible) throws PhysicalOperatorSetupException {
+ Preconditions.checkState(!endpointsAssigned);
+
+ endpointsAssigned = true;
+
+ List<EndpointAffinity> values = Lists.newArrayList();
+ values.addAll(endpointAffinity.values());
+
+ if (values.size() == 0) {
+ List<DrillbitEndpoint> all = Lists.newArrayList(allPossible);
+ final int div = allPossible.size();
+ int start = ThreadLocalRandom.current().nextInt(div);
+ // round robin with random start.
+ for (int i = start; i < start + width; i++) {
+ endpoints.add(all.get(i % div));
+ }
+ } else if (values.size() < width) {
+ throw new NotImplementedException(
+ "Haven't implemented a scenario where we have some node affinity but the affinity list is smaller than the expected width.");
+ } else {
+ // get nodes with highest affinity.
+ Collections.sort(values);
+ values = Lists.reverse(values);
+ for (int i = 0; i < width; i++) {
+ endpoints.add(values.get(i).getEndpoint());
+ }
+ }
+
+ // Set scan and store endpoints.
+ AssignEndpointsToScanAndStore visitor = new AssignEndpointsToScanAndStore();
+ node.getRoot().accept(visitor, endpoints);
+
+ // Set the endpoints for this (one at most) sending exchange.
+ if (node.getSendingExchange() != null) {
+ node.getSendingExchange().setupSenders(majorFragmentId, endpoints);
+ }
+
+ // Set the endpoints for each incoming exchange within this fragment.
+ for (ExchangeFragmentPair e : node.getReceivingExchangePairs()) {
+ e.getExchange().setupReceivers(majorFragmentId, endpoints);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "FragmentWrapper [majorFragmentId=" + majorFragmentId + ", width=" + width + ", stats=" + stats + "]";
+ }
+
+ public DrillbitEndpoint getAssignedEndpoint(int minorFragmentId) {
+ Preconditions.checkState(endpointsAssigned);
+ return this.endpoints.get(minorFragmentId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java
deleted file mode 100644
index 562d109..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.pop.receiver;
-
-import java.util.List;
-
-import org.apache.drill.common.physical.pop.base.AbstractReceiver;
-import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("nway-ordering-receiver")
-public class NWayOrderingReceiver extends AbstractReceiver{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NWayOrderingReceiver.class);
-
- @Override
- public List<DrillbitEndpoint> getProvidingEndpoints() {
- return null;
- }
-
- @Override
- public boolean supportsOutOfOrderExchange() {
- return false;
- }
-
- @Override
- public int getSenderCount() {
- return 0;
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java
deleted file mode 100644
index 487c645..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.pop.receiver;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.drill.common.physical.pop.base.AbstractReceiver;
-import org.apache.drill.common.physical.pop.base.PhysicalOperator;
-import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("receiver-random")
-public class RandomReceiver extends AbstractReceiver{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiver.class);
-
- @Override
- public List<DrillbitEndpoint> getProvidingEndpoints() {
- return null;
- }
-
- @Override
- public boolean supportsOutOfOrderExchange() {
- return false;
- }
-
- @Override
- public int getSenderCount() {
- return 0;
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
- return null;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java
deleted file mode 100644
index b0fb51c..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.pop.sender;
-
-import java.util.List;
-
-import org.apache.drill.common.physical.pop.base.AbstractSender;
-import org.apache.drill.common.physical.pop.base.PhysicalOperator;
-import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("hash-partition-sender")
-public class HashPartitionSender extends AbstractSender {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartitionSender.class);
-
- public HashPartitionSender(PhysicalOperator child) {
- super(child);
- }
-
-
- @Override
- public List<DrillbitEndpoint> getDestinations() {
- return null;
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
- return null;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index d3e4b23..05b1cc7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -17,25 +17,19 @@
******************************************************************************/
package org.apache.drill.exec.record;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.vector.ValueVector;
-import com.carrotsearch.hppc.IntObjectOpenHashMap;
-import com.google.common.collect.Lists;
-public class BatchSchema implements Iterable<MaterializedField>{
+public class BatchSchema implements Iterable<MaterializedField> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class);
-
+
private final List<MaterializedField> fields;
- private final boolean hasSelectionVector;
-
- private BatchSchema(boolean hasSelectionVector, List<MaterializedField> fields) {
+ final boolean hasSelectionVector;
+
+ BatchSchema(boolean hasSelectionVector, List<MaterializedField> fields) {
this.fields = fields;
this.hasSelectionVector = hasSelectionVector;
}
@@ -45,88 +39,16 @@ public class BatchSchema implements Iterable<MaterializedField>{
return fields.iterator();
}
- public void addAnyField(short fieldId, boolean nullable, ValueMode mode){
- addTypedField(fieldId, DataType.LATEBIND, nullable, mode, Void.class);
+ public static SchemaBuilder newBuilder() {
+ return new SchemaBuilder();
}
-
- public void addTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass){
- fields.add(new MaterializedField(fieldId, type, nullable, mode, valueClass));
+
+ @Override
+ public String toString() {
+ return "BatchSchema [fields=" + fields + ", hasSelectionVector=" + hasSelectionVector + "]";
}
+
- /**
- * Builder to build BatchSchema. Can have a supporting expected object. If the expected Schema object is defined, the builder will always check that this schema is a equal or more materialized version of the current schema.
- */
- public class BatchSchemaBuilder{
- private IntObjectOpenHashMap<MaterializedField> fields = new IntObjectOpenHashMap<MaterializedField>();
- private IntObjectOpenHashMap<MaterializedField> expectedFields = new IntObjectOpenHashMap<MaterializedField>();
-
- private boolean hasSelectionVector;
-
- public BatchSchemaBuilder(BatchSchema expected){
- for(MaterializedField f: expected){
- expectedFields.put(f.getFieldId(), f);
- }
- hasSelectionVector = expected.hasSelectionVector;
- }
-
- public BatchSchemaBuilder(){
- }
-
-
- /**
- * Add a field where we don't have type information. In this case, DataType will be set to LATEBIND and valueClass will be set to null.
- * @param fieldId The desired fieldId. Should be unique for this BatchSchema.
- * @param nullable Whether this field supports nullability.
- * @param mode
- * @throws SchemaChangeException
- */
- public void addLateBindField(short fieldId, boolean nullable, ValueMode mode) throws SchemaChangeException{
- addTypedField(fieldId, DataType.LATEBIND, nullable, mode, Void.class);
- }
-
- public void setSelectionVector(boolean hasSelectionVector){
- this.hasSelectionVector = hasSelectionVector;
- }
-
- private void setTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass) throws SchemaChangeException{
- MaterializedField f = new MaterializedField(fieldId, type, nullable, mode, valueClass);
- if(expectedFields != null){
- if(!expectedFields.containsKey(f.getFieldId())) throw new SchemaChangeException(String.format("You attempted to add a field for Id An attempt was made to add a duplicate fieldId to the schema. The offending fieldId was %d", fieldId));
- f.checkMaterialization(expectedFields.lget());
- }
- fields.put(f.getFieldId(), f);
- }
-
- public void addTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass) throws SchemaChangeException{
- if(fields.containsKey(fieldId)) throw new SchemaChangeException(String.format("An attempt was made to add a duplicate fieldId to the schema. The offending fieldId was %d", fieldId));
- setTypedField(fieldId, type, nullable, mode, valueClass);
- }
-
- public void replaceTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass) throws SchemaChangeException{
- if(!fields.containsKey(fieldId)) throw new SchemaChangeException(String.format("An attempt was made to replace a field in the schema, however the schema does not currently contain that field id. The offending fieldId was %d", fieldId));
- setTypedField(fieldId, type, nullable, mode, valueClass);
- }
-
-// public void addVector(ValueVector<?> v){
-//
-// }
-//
-// public void replaceVector(ValueVector<?> oldVector, ValueVector<?> newVector){
-//
-// }
-
-
- public BatchSchema buildAndClear() throws SchemaChangeException{
- // check if any fields are unaccounted for.
-
- List<MaterializedField> fieldList = Lists.newArrayList();
- for(MaterializedField f : fields.values){
- if(f != null) fieldList.add(f);
- }
- Collections.sort(fieldList);
- return new BatchSchema(this.hasSelectionVector, fieldList);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
new file mode 100644
index 0000000..c19065d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -0,0 +1,59 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentRecordBatch;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+
+public class FragmentWritableBatch{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWritableBatch.class);
+
+ private final ByteBuf[] buffers;
+ private final FragmentRecordBatch header;
+
+ public FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, WritableBatch batch){
+ this.buffers = batch.getBuffers();
+ FragmentHandle handle = FragmentHandle //
+ .newBuilder() //
+ .setMajorFragmentId(receiveMajorFragmentId) //
+ .setMinorFragmentId(receiveMinorFragmentId) //
+ .setQueryId(queryId) //
+ .build();
+ this.header = FragmentRecordBatch //
+ .newBuilder() //
+ .setIsLastBatch(isLast) //
+ .setDef(batch.getDef()) //
+ .setHandle(handle) //
+ .setSendingMajorFragmentId(sendMajorFragmentId) //
+ .setSendingMinorFragmentId(sendMinorFragmentId) //
+ .build();
+ }
+
+ public ByteBuf[] getBuffers(){
+ return buffers;
+ }
+
+ public FragmentRecordBatch getHeader() {
+ return header;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
index 403c7a3..d820e0e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
@@ -17,7 +17,7 @@
******************************************************************************/
package org.apache.drill.exec.record;
-import org.apache.drill.exec.exception.ExecutionSetupException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
public class InvalidValueAccessor extends ExecutionSetupException{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InvalidValueAccessor.class);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
new file mode 100644
index 0000000..718396e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
+import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
+import org.apache.drill.exec.proto.SchemaDefProtos.MinorType;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+public class MajorTypeSerDe {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MajorTypeSerDe.class);
+
+
+ public static class De extends StdDeserializer<MajorType> {
+
+ public De() {
+ super(MajorType.class);
+ }
+
+ @Override
+ public MajorType deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException,
+ JsonProcessingException {
+ return jp.readValueAs(MajorTypeHolder.class).getMajorType();
+ }
+
+
+ }
+
+
+ public static class Se extends StdSerializer<MajorType> {
+
+ public Se() {
+ super(MajorType.class);
+ }
+
+ @Override
+ public void serialize(MajorType value, JsonGenerator jgen, SerializerProvider provider) throws IOException,
+ JsonGenerationException {
+ MajorTypeHolder holder = MajorTypeHolder.get(value);
+ jgen.writeObject(holder);
+ }
+
+ }
+
+ @JsonInclude(Include.NON_NULL)
+ public static class MajorTypeHolder{
+ @JsonProperty("type") public MinorType minorType;
+ public DataMode mode;
+ public Integer width;
+ public Integer precision;
+ public Integer scale;
+
+ @JsonCreator
+ public MajorTypeHolder(@JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
+ super();
+ this.minorType = minorType;
+ this.mode = mode;
+ this.width = width;
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ private MajorTypeHolder(){}
+
+ @JsonIgnore
+ public MajorType getMajorType(){
+ MajorType.Builder b = MajorType.newBuilder();
+ b.setMode(mode);
+ b.setMinorType(minorType);
+ if(precision != null) b.setPrecision(precision);
+ if(width != null) b.setWidth(width);
+ if(scale != null) b.setScale(scale);
+ return b.build();
+ }
+
+ public static MajorTypeHolder get(MajorType mt){
+ MajorTypeHolder h = new MajorTypeHolder();
+ h.minorType = mt.getMinorType();
+ h.mode = mt.getMode();
+ if(mt.hasPrecision()) h.precision = mt.getPrecision();
+ if(mt.hasScale()) h.scale = mt.getScale();
+ if(mt.hasWidth()) h.width = mt.getWidth();
+ return h;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index 2e941a2..09427ef 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -17,67 +17,152 @@
******************************************************************************/
package org.apache.drill.exec.record;
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.exception.SchemaChangeException;
-
-public class MaterializedField implements Comparable<MaterializedField>{
- private int fieldId;
- private DataType type;
- private boolean nullable;
- private ValueMode mode;
- private Class<?> valueClass;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.RecordField.ValueMode;
+import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
+import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
+import org.apache.drill.exec.record.vector.TypeHelper;
+
+public class MaterializedField implements Comparable<MaterializedField> {
+ private final FieldDef def;
+
+ public MaterializedField(FieldDef def) {
+ this.def = def;
+ }
+
+ public static MaterializedField create(FieldDef def){
+ return new MaterializedField(def);
+ }
+
+ public static MaterializedField create(SchemaPath path, int fieldId, int parentId, MajorType type) {
+ FieldDef.Builder b = FieldDef.newBuilder();
+ b.setFieldId(fieldId);
+ b.setMajorType(type);
+ addSchemaPathToFieldDef(path, b);
+ b.setParentId(parentId);
+ return create(b.build());
+ }
+
+ private static void addSchemaPathToFieldDef(SchemaPath path, FieldDef.Builder builder) {
+ for (PathSegment p = path.getRootSegment();; p = p.getChild()) {
+ NamePart.Builder b = NamePart.newBuilder();
+ if (p.isArray()) {
+ b.setType(Type.ARRAY);
+ } else {
+ b.setName(p.getNameSegment().getPath().toString());
+ b.setType(Type.NAME);
+ }
+ builder.addName(b.build());
+ if(p.isLastPath()) break;
+ }
+ }
+
+ public FieldDef getDef() {
+ return def;
+ }
- public MaterializedField(int fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass) {
- super();
- this.fieldId = fieldId;
- this.type = type;
- this.nullable = nullable;
- this.mode = mode;
- this.valueClass = valueClass;
+ public String getName(){
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for(NamePart np : def.getNameList()){
+ if(np.getType() == Type.ARRAY){
+ sb.append("[]");
+ }else{
+ if(first){
+ first = false;
+ }else{
+ sb.append(".");
+ }
+ sb.append(np.getName());
+
+ }
+ }
+ return sb.toString();
+ }
+
+ public int getWidth() {
+ return def.getMajorType().getWidth();
}
public int getFieldId() {
- return fieldId;
+ return def.getFieldId();
}
- public DataType getType() {
- return type;
+ public MajorType getType() {
+ return def.getMajorType();
}
public boolean isNullable() {
- return nullable;
+ return def.getMajorType().getMode() == DataMode.OPTIONAL;
}
- public ValueMode getMode() {
- return mode;
+ public DataMode getDataMode() {
+ return def.getMajorType().getMode();
}
public Class<?> getValueClass() {
- return valueClass;
+ return TypeHelper.getValueVectorClass(getType().getMinorType(), getDataMode());
}
- private void check(String name, Object val1, Object expected) throws SchemaChangeException{
- if(expected.equals(val1)) return;
- throw new SchemaChangeException("Expected and actual field definitions don't match. Actual %s: %s, expected %s: %s", name, val1, name, expected);
- }
-
- public void checkMaterialization(MaterializedField expected) throws SchemaChangeException{
- if(this.type == expected.type || expected.type == DataType.LATEBIND) throw new SchemaChangeException("Expected and actual field definitions don't match. Actual DataType: %s, expected DataTypes: %s", this.type, expected.type);
- if(expected.valueClass != null) check("valueClass", this.valueClass, expected.valueClass);
- check("fieldId", this.fieldId, expected.fieldId);
- check("nullability", this.nullable, expected.nullable);
- check("valueMode", this.mode, expected.mode);
- }
+ public boolean matches(SchemaPath path) {
+ Iterator<NamePart> iter = def.getNameList().iterator();
+
+ for (PathSegment p = path.getRootSegment();; p = p.getChild()) {
+ if(p == null) break;
+ if (!iter.hasNext()) return false;
+ NamePart n = iter.next();
+
+ if (p.isArray()) {
+ if (n.getType() == Type.ARRAY) continue;
+ return false;
+ } else {
+ if (p.getNameSegment().getPath().equals(n.getName())) continue;
+ return false;
+ }
+
+ }
+ // we've reviewed all path segments. confirm that we don't have any extra name parts.
+ return !iter.hasNext();
- public MaterializedField getNullableVersion(Class<?> valueClass){
- return new MaterializedField(fieldId, type, true, mode, valueClass);
}
-
+
+ // private void check(String name, Object val1, Object expected) throws SchemaChangeException{
+ // if(expected.equals(val1)) return;
+ // throw new
+ // SchemaChangeException("Expected and actual field definitions don't match. Actual %s: %s, expected %s: %s", name,
+ // val1, name, expected);
+ // }
+
+ // public void checkMaterialization(MaterializedField expected) throws SchemaChangeException{
+ // if(this.type == expected.type || expected.type == DataType.LATEBIND) throw new
+ // SchemaChangeException("Expected and actual field definitions don't match. Actual DataType: %s, expected DataTypes: %s",
+ // this.type, expected.type);
+ // if(expected.valueClass != null) check("valueClass", this.valueClass, expected.valueClass);
+ // check("fieldId", this.fieldId, expected.fieldId);
+ // check("nullability", this.nullable, expected.nullable);
+ // check("valueMode", this.mode, expected.mode);
+ // }
+ //
+ // public MaterializedField getNullableVersion(Class<?> valueClass){
+ // return new MaterializedField(path, fieldId, type, true, mode, valueClass);
+ // }
+
@Override
public int compareTo(MaterializedField o) {
- return Integer.compare(this.fieldId, o.fieldId);
+ return Integer.compare(this.getFieldId(), o.getFieldId());
}
-
+
+ @Override
+ public String toString() {
+ return "MaterializedField [" + def.toString() + "]";
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
new file mode 100644
index 0000000..c244cea
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentRecordBatch;
+
+public class RawFragmentBatch {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawFragmentBatch.class);
+
+ final FragmentRecordBatch header;
+ final ByteBuf body;
+
+ public RawFragmentBatch(FragmentRecordBatch header, ByteBuf body) {
+ super();
+ this.header = header;
+ this.body = body;
+ }
+
+ public FragmentRecordBatch getHeader() {
+ return header;
+ }
+
+ public ByteBuf getBody() {
+ return body;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
new file mode 100644
index 0000000..08b0e11
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
@@ -0,0 +1,27 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.ops.FragmentContext;
+
+public interface RawFragmentBatchProvider {
+
+ public RawFragmentBatch getNext();
+ public void kill(FragmentContext context);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index eca62bb..3e4ded2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -34,8 +34,9 @@ public interface RecordBatch {
NONE, // No more records were found.
OK, // A new range of records have been provided.
OK_NEW_SCHEMA, // A full collection of records
- STOP // Informs parent nodes that the query has terminated. In this case, a consumer can consume their QueryContext
+ STOP, // Informs parent nodes that the query has terminated. In this case, a consumer can consume their QueryContext
// to understand the current state of things.
+ NOT_YET // used by batches that haven't received incoming data yet.
}
/**
@@ -81,5 +82,11 @@ public interface RecordBatch {
* @return An IterOutcome describing the result of the iteration.
*/
public IterOutcome next();
+
+ /**
+ * Get a writable version of this batch. Takes over owernship of existing buffers.
+ * @return
+ */
+ public WritableBatch getWritableBatch();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
new file mode 100644
index 0000000..d990198
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -0,0 +1,143 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.record.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.carrotsearch.hppc.cursors.IntObjectCursor;
+
+public class RecordBatchLoader implements Iterable<IntObjectCursor<ValueVector<?>>>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);
+
+ private IntObjectOpenHashMap<ValueVector<?>> vectors = new IntObjectOpenHashMap<ValueVector<?>>();
+ private final BufferAllocator allocator;
+ private int recordCount;
+ private BatchSchema schema;
+
+ public RecordBatchLoader(BufferAllocator allocator) {
+ super();
+ this.allocator = allocator;
+ }
+
+ /**
+ * Load a record batch from a single buffer.
+ *
+ * @param def
+ * The definition for the record batch.
+ * @param buf
+ * The buffer that holds the data ssociated with the record batch
+ * @return Whether or not the schema changed since the previous load.
+ * @throws SchemaChangeException
+ */
+ public boolean load(RecordBatchDef def, ByteBuf buf) throws SchemaChangeException {
+// logger.debug("Loading record batch with def {} and data {}", def, buf);
+ this.recordCount = def.getRecordCount();
+ boolean schemaChanged = false;
+
+ IntObjectOpenHashMap<ValueVector<?>> newVectors = new IntObjectOpenHashMap<ValueVector<?>>();
+
+ List<FieldMetadata> fields = def.getFieldList();
+
+ int bufOffset = 0;
+ for (FieldMetadata fmd : fields) {
+ FieldDef fieldDef = fmd.getDef();
+ ValueVector<?> v = vectors.remove(fieldDef.getFieldId());
+ if (v != null) {
+ if (v.getField().getDef().equals(fieldDef)) {
+ v.setTo(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+ newVectors.put(fieldDef.getFieldId(), v);
+ continue;
+ } else {
+ v.close();
+ v = null;
+ }
+ }
+ // if we arrive here, either the metadata didn't match, or we didn't find a vector.
+ schemaChanged = true;
+ MaterializedField m = new MaterializedField(fieldDef);
+ v = TypeHelper.getNewVector(m, allocator);
+ v.setTo(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+ newVectors.put(fieldDef.getFieldId(), v);
+ }
+
+ if(!vectors.isEmpty()){
+ schemaChanged = true;
+ for(IntObjectCursor<ValueVector<?>> cursor : newVectors){
+ cursor.value.close();
+ }
+
+ }
+
+ if(schemaChanged){
+ // rebuild the schema.
+ SchemaBuilder b = BatchSchema.newBuilder();
+ for(IntObjectCursor<ValueVector<?>> cursor : newVectors){
+ b.addField(cursor.value.getField());
+ }
+ b.setSelectionVector(false);
+ this.schema = b.build();
+ }
+ vectors = newVectors;
+ return schemaChanged;
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+ ValueVector<?> v = vectors.get(fieldId);
+ assert v != null;
+ if (v.getClass() != clazz)
+ throw new InvalidValueAccessor(String.format(
+ "Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
+ clazz.getCanonicalName(), v.getClass().getCanonicalName()));
+ return (T) v;
+ }
+
+ public int getRecordCount() {
+ return recordCount;
+ }
+
+
+ public WritableBatch getWritableBatch(){
+ return WritableBatch.get(recordCount, vectors);
+ }
+
+ @Override
+ public Iterator<IntObjectCursor<ValueVector<?>>> iterator() {
+ return this.vectors.iterator();
+ }
+
+ public BatchSchema getSchema(){
+ return schema;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
new file mode 100644
index 0000000..1e25b1a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
@@ -0,0 +1,127 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.expression.types.DataType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.RecordField.ValueMode;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+import com.google.common.collect.Lists;
+
+/**
+ * A reusable builder that supports the creation of BatchSchemas. Can have a supporting expected object. If the expected Schema object is defined, the
+ * builder will always check that this schema is a equal or more materialized version of the current schema.
+ */
+public class SchemaBuilder {
+ private IntObjectOpenHashMap<MaterializedField> fields = new IntObjectOpenHashMap<MaterializedField>();
+ private IntObjectOpenHashMap<MaterializedField> expectedFields = new IntObjectOpenHashMap<MaterializedField>();
+
+ private boolean hasSelectionVector;
+
+ public SchemaBuilder(BatchSchema expected) {
+ for (MaterializedField f : expected) {
+ expectedFields.put(f.getFieldId(), f);
+ }
+ hasSelectionVector = expected.hasSelectionVector;
+ }
+
+ SchemaBuilder() {
+ }
+
+ /**
+ * Add a field where we don't have type information. In this case, DataType will be set to LATEBIND and valueClass
+ * will be set to null.
+ *
+ * @param fieldId
+ * The desired fieldId. Should be unique for this BatchSchema.
+ * @param nullable
+ * Whether this field supports nullability.
+ * @param mode
+ * @throws SchemaChangeException
+ */
+// public void addLateBindField(short fieldId, boolean nullable, ValueMode mode) throws SchemaChangeException {
+// addTypedField(fieldId, DataType.LATEBIND, nullable, mode, Void.class);
+// }
+
+ public void setSelectionVector(boolean hasSelectionVector) {
+ this.hasSelectionVector = hasSelectionVector;
+ }
+
+
+// private void setTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass)
+// throws SchemaChangeException {
+// MaterializedField f = new MaterializedField(fieldId, type, nullable, mode, valueClass);
+// if (expectedFields != null) {
+// if (!expectedFields.containsKey(f.getFieldId()))
+// throw new SchemaChangeException(
+// String
+// .format(
+// "You attempted to add a field for Id An attempt was made to add a duplicate fieldId to the schema. The offending fieldId was %d",
+// fieldId));
+// f.checkMaterialization(expectedFields.lget());
+// }
+// fields.put(f.getFieldId(), f);
+// }
+
+ public void addField(MaterializedField f){
+ fields.put(f.getFieldId(), f);
+ }
+
+// public void addTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass)
+// throws SchemaChangeException {
+// if (fields.containsKey(fieldId))
+// throw new SchemaChangeException(String.format(
+// "An attempt was made to add a duplicate fieldId to the schema. The offending fieldId was %d", fieldId));
+// setTypedField(fieldId, type, nullable, mode, valueClass);
+// }
+//
+// public void replaceTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass)
+// throws SchemaChangeException {
+// if (!fields.containsKey(fieldId))
+// throw new SchemaChangeException(
+// String.format("An attempt was made to replace a field in the schema, however the schema does " +
+// "not currently contain that field id. The offending fieldId was %d", fieldId));
+// setTypedField(fieldId, type, nullable, mode, valueClass);
+// }
+
+ public void removeField(short fieldId) throws SchemaChangeException{
+ MaterializedField f = fields.remove(fieldId);
+ if(f == null) throw new SchemaChangeException("You attempted to remove an nonexistent field.");
+ }
+
+ /**
+ * Generate a new BatchSchema object based on the current state of the builder.
+ * @return
+ * @throws SchemaChangeException
+ */
+ public BatchSchema build() throws SchemaChangeException {
+ // check if any fields are unaccounted for.
+
+ List<MaterializedField> fieldList = Lists.newArrayList();
+ for (ObjectCursor<MaterializedField> f : fields.values()) {
+ if (f != null) fieldList.add(f.value);
+ }
+ Collections.sort(fieldList);
+ return new BatchSchema(this.hasSelectionVector, fieldList);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
new file mode 100644
index 0000000..788c731
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -0,0 +1,108 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.carrotsearch.hppc.procedures.IntObjectProcedure;
+import com.google.common.collect.Lists;
+
+/**
+ * A specialized version of record batch that can moves out buffers and preps them for writing.
+ */
+public class WritableBatch {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WritableBatch.class);
+
+ private final RecordBatchDef def;
+ private final ByteBuf[] buffers;
+
+ public WritableBatch(RecordBatchDef def, List<ByteBuf> buffers) {
+ logger.debug("Created new writable batch with def {} and buffers {}", def, buffers);
+ this.def = def;
+ this.buffers = buffers.toArray(new ByteBuf[buffers.size()]);
+ }
+
+ public WritableBatch(RecordBatchDef def, ByteBuf[] buffers) {
+ super();
+ this.def = def;
+ this.buffers = buffers;
+ }
+
+
+ public RecordBatchDef getDef(){
+ return def;
+ }
+ public ByteBuf[] getBuffers(){
+ return buffers;
+ }
+
+// public static WritableBatch get(ValueVector<?>[] vectors){
+// WritableCreator c = new WritableCreator();
+// for(int i =0; i < vectors.length; i++){
+// c.apply(i, vectors[i]);
+// }
+// return c.get();
+// }
+//
+
+ public static WritableBatch get(int recordCount, IntObjectOpenHashMap<ValueVector<?>> fields){
+ WritableCreator creator = new WritableCreator(recordCount);
+ fields.forEach(creator);
+ return creator.get();
+
+ }
+
+ private static class WritableCreator implements IntObjectProcedure<ValueVector<?>>{
+
+ List<ByteBuf> buffers = Lists.newArrayList();
+ List<FieldMetadata> metadata = Lists.newArrayList();
+ private int recordCount;
+
+
+ public WritableCreator(int recordCount) {
+ super();
+ this.recordCount = recordCount;
+ }
+
+ @Override
+ public void apply(int key, ValueVector<?> value) {
+ metadata.add(value.getMetadata());
+ for(ByteBuf b : value.getBuffers()){
+ buffers.add(b);
+ b.retain();
+ }
+ // allocate new buffer to release hold on old buffer.
+ value.allocateNew(value.capacity());
+ }
+
+
+ public WritableBatch get(){
+ RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).build();
+ WritableBatch b = new WritableBatch(batchDef, buffers);
+ return b;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
index 912e02d..b32f067 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.record.vector;
import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.record.MaterializedField;
/**
* Abstract class that fixed value vectors are derived from.
@@ -27,12 +29,12 @@ import org.apache.drill.exec.memory.BufferAllocator;
abstract class AbstractFixedValueVector<T extends AbstractFixedValueVector<T>> extends BaseValueVector<T> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFixedValueVector.class);
- private final int widthInBits;
+ protected final int widthInBits;
protected int longWords = 0;
-
- public AbstractFixedValueVector(int fieldId, BufferAllocator allocator, int widthInBits) {
- super(fieldId, allocator);
+
+ public AbstractFixedValueVector(MaterializedField field, BufferAllocator allocator, int widthInBits) {
+ super(field, allocator);
this.widthInBits = widthInBits;
}
@@ -56,5 +58,16 @@ abstract class AbstractFixedValueVector<T extends AbstractFixedValueVector<T>> e
longWords = 0;
}
+ @Override
+ public void setRecordCount(int recordCount) {
+ this.data.writerIndex(recordCount*(widthInBits/8));
+ super.setRecordCount(recordCount);
+ }
+
+
+
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
index 8d524b2..b001add 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
@@ -19,20 +19,25 @@ package org.apache.drill.exec.record.vector;
import io.netty.buffer.ByteBuf;
+import java.util.Random;
+
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.record.DeadBuf;
+import org.apache.drill.exec.record.MaterializedField;
public abstract class BaseValueVector<T extends BaseValueVector<T>> implements ValueVector<T>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseValueVector.class);
protected final BufferAllocator allocator;
protected ByteBuf data = DeadBuf.DEAD_BUFFER;
- protected int valueCount = 0;
- protected final int fieldId;
+ protected int maxValueCount = 0;
+ protected final MaterializedField field;
+ private int recordCount;
- public BaseValueVector(int fieldId, BufferAllocator allocator) {
+ public BaseValueVector(MaterializedField field, BufferAllocator allocator) {
this.allocator = allocator;
- this.fieldId = fieldId;
+ this.field = field;
}
public final void allocateNew(int valueCount){
@@ -42,35 +47,42 @@ public abstract class BaseValueVector<T extends BaseValueVector<T>> implements V
resetAllocation(valueCount, newBuf);
}
- protected abstract int getAllocationSize(int valueCount);
+ protected abstract int getAllocationSize(int maxValueCount);
protected abstract void childResetAllocation(int valueCount, ByteBuf buf);
protected abstract void childCloneMetadata(T other);
protected abstract void childClear();
- protected final void resetAllocation(int valueCount, ByteBuf buf){
+ /**
+ * Update the current buffer allocation utilize the provided allocation.
+ * @param valueCount
+ * @param buf
+ */
+ protected final void resetAllocation(int maxValueCount, ByteBuf buf){
clear();
- this.valueCount = valueCount;
+ buf.retain();
+ this.maxValueCount = maxValueCount;
this.data = buf;
- childResetAllocation(valueCount, buf);
+ childResetAllocation(maxValueCount, buf);
}
public final void cloneMetadata(T other){
- other.valueCount = this.valueCount;
+ other.maxValueCount = this.maxValueCount;
}
+
@Override
public final void cloneInto(T vector) {
- vector.allocateNew(valueCount);
+ vector.allocateNew(maxValueCount);
data.writeBytes(vector.data);
cloneMetadata(vector);
- childResetAllocation(valueCount, vector.data);
+ childResetAllocation(maxValueCount, vector.data);
}
@Override
public final void transferTo(T vector) {
vector.data = this.data;
cloneMetadata(vector);
- childResetAllocation(valueCount, data);
+ childResetAllocation(maxValueCount, data);
clear();
}
@@ -78,7 +90,7 @@ public abstract class BaseValueVector<T extends BaseValueVector<T>> implements V
if(this.data != DeadBuf.DEAD_BUFFER){
this.data.release();
this.data = DeadBuf.DEAD_BUFFER;
- this.valueCount = 0;
+ this.maxValueCount = 0;
}
childClear();
}
@@ -88,8 +100,8 @@ public abstract class BaseValueVector<T extends BaseValueVector<T>> implements V
*
* @return
*/
- public int size() {
- return valueCount;
+ public int capacity() {
+ return maxValueCount;
}
@Override
@@ -98,8 +110,48 @@ public abstract class BaseValueVector<T extends BaseValueVector<T>> implements V
}
@Override
- public ByteBuf getBuffer() {
- return data;
+ public ByteBuf[] getBuffers() {
+ return new ByteBuf[]{data};
+ }
+
+ public MaterializedField getField(){
+ return field;
+ }
+
+
+ public int getRecordCount() {
+ return recordCount;
+ }
+
+ public void setRecordCount(int recordCount) {
+ this.recordCount = recordCount;
+ }
+
+ @Override
+ public FieldMetadata getMetadata() {
+ int len = 0;
+ for(ByteBuf b : getBuffers()){
+ len += b.writerIndex();
+ }
+ return FieldMetadata.newBuilder().setDef(getField().getDef()).setValueCount(getRecordCount()).setBufferLength(len).build();
+ }
+
+ @Override
+ public void setTo(FieldMetadata metadata, ByteBuf data) {
+// logger.debug("Updating value vector to {}, {}", metadata, data);
+ clear();
+ resetAllocation(metadata.getValueCount(), data);
+ }
+
+ @Override
+ public void randomizeData() {
+ if(this.data != DeadBuf.DEAD_BUFFER){
+ Random r = new Random();
+ for(int i =0; i < data.capacity()-8; i+=8){
+ data.setLong(i, r.nextLong());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Bit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Bit.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Bit.java
new file mode 100644
index 0000000..533e3bd
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Bit.java
@@ -0,0 +1,168 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.common.expression.types.DataType;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.RecordField.ValueMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.hadoop.io.SequenceFile;
+
+/**
+ * Describes a vector which holds a number of true/false values.
+ */
+public class Bit extends AbstractFixedValueVector<Bit> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Bit.class);
+
+ public Bit(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator, 1);
+ }
+
+
+// /** Returns true or false for the specified bit index.
+// * The index should be less than the OpenBitSet size
+// */
+// public boolean get(int index) {
+// assert index >= 0 && index < this.valueCount;
+// int i = index >> 3; // div 8
+// // signed shift will keep a negative index and force an
+// // array-index-out-of-bounds-exception, removing the need for an explicit check.
+// int bit = index & 0x3f; // mod 64
+// long bitmask = 1L << bit;
+// return (data.getLong(i) & bitmask) != 0;
+// }
+
+ public int getBit(int index) {
+
+ assert index >= 0 && index < this.maxValueCount;
+ int i = 8*(index >> 6); // div 8
+ int bit = index & 0x3f; // mod 64
+ return ((int) (data.getLong(i) >>> bit)) & 0x01;
+ }
+
+ /** Sets the bit at the specified index.
+ * The index should be less than the OpenBitSet size.
+ */
+ public void set(int index) {
+ assert index >= 0 && index < this.maxValueCount;
+ int wordNum = index >> 3;
+ int bit = index & 0x3f;
+ long bitmask = 1L << bit;
+ data.setLong(wordNum, data.getLong(wordNum) | bitmask);
+ }
+
+ public void clear(int index) {
+ assert index >= 0 && index < this.maxValueCount;
+ int wordNum = index >> 3;
+ int bit = index & 0x03f;
+ long bitmask = 1L << bit;
+ data.setLong(wordNum, data.getLong(wordNum) & ~bitmask);
+ }
+
+
+
+ /** Clears a range of bits. Clearing past the end does not change the size of the set.
+ *
+ * @param startBitIndex lower index
+ * @param lastBitIndex one-past the last bit to clear
+ */
+ private void clear2(int startBitIndex, int lastBitIndex) {
+ if (lastBitIndex <= startBitIndex) return;
+
+ int firstWordStart = (startBitIndex>>3);
+ if (firstWordStart >= this.longWords) return;
+
+ // since endIndex is one past the end, this is index of the last
+ // word to be changed.
+ int lastWordStart = ((lastBitIndex-1)>>3);
+
+ long startmask = -1L << startBitIndex;
+ long endmask = -1L >>> -lastBitIndex; // 64-(endIndex&0x3f) is the same as -endIndex due to wrap
+
+ // invert masks since we are clearing
+ startmask = ~startmask;
+ endmask = ~endmask;
+
+ if (firstWordStart == lastWordStart) {
+ data.setLong(firstWordStart, data.getLong(firstWordStart) & (startmask | endmask));
+ return;
+ }
+ data.setLong(firstWordStart, data.getLong(firstWordStart) & startmask);
+
+ int middle = Math.min(this.longWords, lastWordStart);
+
+ for(int i =firstWordStart+8; i < middle; i += 8){
+ data.setLong(i, 0L);
+ }
+ if (lastWordStart < this.longWords) {
+ data.setLong(lastWordStart, data.getLong(lastWordStart) & endmask);
+ }
+ }
+
+ public void setAllFalse(){
+ clear(0, maxValueCount);
+ }
+
+
+ public void clear(int startIndex, int endIndex) {
+ if (endIndex <= startIndex) return;
+
+ int startWord = (startIndex >> 6);
+ if (startWord >= longWords) return;
+
+ // since endIndex is one past the end, this is index of the last
+ // word to be changed.
+ int endWord = ((endIndex - 1) >> 6);
+
+ long startmask = -1L << startIndex;
+ long endmask = -1L >>> -endIndex; // 64-(endIndex&0x3f) is the same as -endIndex due to wrap
+
+ // invert masks since we are clearing
+ startmask = ~startmask;
+ endmask = ~endmask;
+
+ int startWordPos = startWord * 8;
+ if (startWord == endWord) {
+ data.setLong(startWordPos, data.getLong(startWordPos) & (startmask | endmask));
+ return;
+ }
+
+ int endWordPos = endWord * 8;
+
+ data.setLong(startWordPos, data.getLong(startWordPos) & startmask);
+
+ int middle = Math.min(longWords, endWord)*8;
+
+
+ for(int i =startWordPos+8; i < middle; i += 8){
+ data.setLong(i, 0L);
+ }
+
+ if (endWordPos < startWordPos) {
+ data.setLong(endWordPos, data.getLong(endWordPos) & endmask);
+ }
+ }
+
+
+ @Override
+ public Object getObject(int index) {
+ return this.getBit(index);
+ }
+
+
+}