You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2012/09/30 22:41:03 UTC
svn commit: r1392105 [1/7] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/optimiz...
Author: heyongqiang
Date: Sun Sep 30 20:41:01 2012
New Revision: 1392105
URL: http://svn.apache.org/viewvc?rev=1392105&view=rev
Log:
HIVE-2206:add a new optimizer for query correlation discovery and optimization (Yin Huai via He Yongqiang)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java
hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q
hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q
hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q
hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q
hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q
hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer1.q.out
hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer2.q.out
hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer3.q.out
hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer4.q.out
hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer5.q.out
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sun Sep 30 20:41:01 2012
@@ -495,6 +495,7 @@ public class HiveConf extends Configurat
HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
+ HIVEOPTCORRELATION("hive.optimize.correlation", false), // exploit intra-query correlations
// optimize skewed join by changing the query plan at compile time
HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME("hive.optimize.skewjoin.compiletime", false),
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Sun Sep 30 20:41:01 2012
@@ -928,6 +928,12 @@
</property>
<property>
+ <name>hive.optimize.correlation</name>
+ <value>false</value>
+ <description>exploit intra-query correlations.</description>
+</property>
+
+<property>
<name>hive.exec.dynamic.partition</name>
<value>true</value>
<description>Whether or not to allow dynamic partitions in DML/DDL.</description>
Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java Sun Sep 30 20:41:01 2012
@@ -28,7 +28,10 @@ public enum OperatorType implements org.
LATERALVIEWJOIN(14),
LATERALVIEWFORWARD(15),
HASHTABLESINK(16),
- HASHTABLEDUMMY(17);
+ HASHTABLEDUMMY(17),
+ CORRELATIONCOMPOSITE(18),
+ CORRELATIONLOCALSIMULATIVEREDUCESINK(19),
+ CORRELATIONREDUCERDISPATCH(20);
private final int value;
@@ -85,6 +88,12 @@ public enum OperatorType implements org.
return HASHTABLESINK;
case 17:
return HASHTABLEDUMMY;
+ case 18:
+ return CORRELATIONCOMPOSITE;
+ case 19:
+ return CORRELATIONLOCALSIMULATIVEREDUCESINK;
+ case 20:
+ return CORRELATIONREDUCERDISPATCH;
default:
return null;
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * BaseReduceSinkOperator
+ **/
+public abstract class BaseReduceSinkOperator<T extends BaseReduceSinkDesc> extends
+ TerminalOperator<T> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ protected static final Log LOG = LogFactory.getLog(BaseReduceSinkOperator.class
+ .getName());
+
+ /**
+ * The evaluators for the key columns. Key columns decide the sort order on
+ * the reducer side. Key columns are passed to the reducer in the "key".
+ */
+ protected transient ExprNodeEvaluator[] keyEval;
+ /**
+ * The evaluators for the value columns. Value columns are passed to reducer
+ * in the "value".
+ */
+ protected transient ExprNodeEvaluator[] valueEval;
+ /**
+ * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
+ * Hive language). Partition columns decide the reducer that the current row
+ * goes to. Partition columns are not passed to reducer.
+ */
+ protected transient ExprNodeEvaluator[] partitionEval;
+
+ // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
+ // ready
+ protected transient Serializer keySerializer;
+ protected transient boolean keyIsText;
+ protected transient Serializer valueSerializer;
+ protected transient int tag;
+ protected transient byte[] tagByte = new byte[1];
+ protected transient int numDistributionKeys;
+ protected transient int numDistinctExprs;
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+
+ try {
+ keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
+ int i = 0;
+ for (ExprNodeDesc e : conf.getKeyCols()) {
+ keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ numDistributionKeys = conf.getNumDistributionKeys();
+ distinctColIndices = conf.getDistinctColumnIndices();
+ numDistinctExprs = distinctColIndices.size();
+
+ valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
+ i = 0;
+ for (ExprNodeDesc e : conf.getValueCols()) {
+ valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
+ i = 0;
+ for (ExprNodeDesc e : conf.getPartitionCols()) {
+ partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ tag = conf.getTag();
+ tagByte[0] = (byte) tag;
+ LOG.info("Using tag = " + tag);
+
+ TableDesc keyTableDesc = conf.getKeySerializeInfo();
+ keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+ .newInstance();
+ keySerializer.initialize(null, keyTableDesc.getProperties());
+ keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+
+ TableDesc valueTableDesc = conf.getValueSerializeInfo();
+ valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+ .newInstance();
+ valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+ isFirstRow = true;
+ initializeChildren(hconf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected transient InspectableObject tempInspectableObject = new InspectableObject();
+ protected transient HiveKey keyWritable = new HiveKey();
+ protected transient Writable value;
+
+ protected transient StructObjectInspector keyObjectInspector;
+ protected transient StructObjectInspector valueObjectInspector;
+ protected transient ObjectInspector[] partitionObjectInspectors;
+
+ protected transient Object[][] cachedKeys;
+ protected transient Object[] cachedValues;
+ protected transient List<List<Integer>> distinctColIndices;
+
+ protected boolean isFirstRow;
+
+ protected transient Random random;
+
+ /**
+ * Initializes array of ExprNodeEvaluator. Adds Union field for distinct
+ * column indices for group by.
+ * Puts the return values into a StructObjectInspector with output column
+ * names.
+ *
+ * If distinctColIndices is empty, the object inspector is same as
+ * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)}
+ */
+ protected static StructObjectInspector initEvaluatorsAndReturnStruct(
+ ExprNodeEvaluator[] evals, List<List<Integer>> distinctColIndices,
+ List<String> outputColNames,
+ int length, ObjectInspector rowInspector)
+ throws HiveException {
+ int inspectorLen = evals.length > length ? length + 1 : evals.length;
+ List<ObjectInspector> sois = new ArrayList<ObjectInspector>(inspectorLen);
+
+ // keys
+ ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector);
+ sois.addAll(Arrays.asList(fieldObjectInspectors));
+
+ if (evals.length > length) {
+ // union keys
+ List<ObjectInspector> uois = new ArrayList<ObjectInspector>();
+ for (List<Integer> distinctCols : distinctColIndices) {
+ List<String> names = new ArrayList<String>();
+ List<ObjectInspector> eois = new ArrayList<ObjectInspector>();
+ int numExprs = 0;
+ for (int i : distinctCols) {
+ names.add(HiveConf.getColumnInternalName(numExprs));
+ eois.add(evals[i].initialize(rowInspector));
+ numExprs++;
+ }
+ uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois));
+ }
+ UnionObjectInspector uoi =
+ ObjectInspectorFactory.getStandardUnionObjectInspector(uois);
+ sois.add(uoi);
+ }
+ return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois);
+ }
+
+ @Override
+ public abstract void processOp(Object row, int tag) throws HiveException;
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return "BaseReduceSink";
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Correlation composite operator implementation. This operator is used only in map phase for
+ * sharing table scan. Suppose that there are multiple operation paths (e.g. two different
+ * predicates on a table ) that share a common table. A row will be processed by these operation
+ * paths. To tag which operation paths actually forward this row, CorrelationCompositeOperator is
+ * used. For a row, this operator will buffer forwarded rows from its parents and then tag this row
+ * with a operation path tag indicating which paths forwarded this row. Right now, since operation
+ * path tag used in ReduceSinkOperator has 1 byte, this operator can have at most 8 parents
+ * (operation paths). For example, suppose that the common table is T and predicates P1 and P2 will
+ * be used in sub-queries SQ1 and SQ2, respectively. The CorrelationCompositeOperator
+ * will apply P1 and P2 on the row and tag the record based on if P1 or P2 is true.
+ **/
+public class CorrelationCompositeOperator extends Operator<CorrelationCompositeDesc> implements
+ Serializable {
+
+ public static enum Counter {
+ FORWARDED
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private ReduceSinkOperator correspondingReduceSinkOperators;
+
+ private transient final LongWritable forwarded_count;
+
+ private transient boolean isFirstRow;
+
+ private int[] allOperationPathTags;
+
+ private Object[] rowBuffer; // buffer the output from multiple parents
+
+ public CorrelationCompositeOperator() {
+ super();
+ forwarded_count = new LongWritable();
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ isFirstRow = true;
+ rowBuffer = new Object[parentOperators.size()];
+ correspondingReduceSinkOperators = conf.getCorrespondingReduceSinkOperator();
+ allOperationPathTags = conf.getAllOperationPathTags();
+ statsMap.put(Counter.FORWARDED, forwarded_count);
+ outputObjInspector =
+ ObjectInspectorUtils.getStandardObjectInspector(outputObjInspector,
+ ObjectInspectorCopyOption.JAVA);
+
+ // initialize its children
+ initializeChildren(hconf);
+ }
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ rowBuffer[tag] =
+ ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[tag],
+ ObjectInspectorCopyOption.JAVA);
+ }
+
+ private void evaluateBuffer() throws HiveException {
+ List<Integer> operationPathTags = new ArrayList<Integer>();
+ boolean isForward = false;
+ Object forwardedRow = null;
+ for (int i = 0; i < rowBuffer.length; i++) {
+ if (rowBuffer[i] != null) {
+ isForward = true;
+ operationPathTags.add(allOperationPathTags[i]);
+ if (forwardedRow == null) {
+ forwardedRow = rowBuffer[i];
+ }
+ }
+ }
+ if (isForward) {
+ assert correspondingReduceSinkOperators != null;
+ correspondingReduceSinkOperators.setOperationPathTags(operationPathTags);
+ forwarded_count.set(forwarded_count.get() + 1);
+ forward(forwardedRow, null);
+ }
+ for (int i = 0; i < rowBuffer.length; i++) {
+ rowBuffer[i] = null;
+ }
+ }
+
+ @Override
+ public void setRowNumber(long rowNumber) throws HiveException {
+ this.rowNumber = rowNumber;
+ if (childOperators == null) {
+ return;
+ }
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ assert rowNumber >= childOperatorsArray[i].getRowNumber();
+ if (rowNumber != childOperatorsArray[i].getRowNumber()) {
+ childOperatorsArray[i].setRowNumber(rowNumber);
+ }
+ }
+ if (isFirstRow) {
+ for (int i = 0; i < rowBuffer.length; i++) {
+ rowBuffer[i] = null;
+ }
+ isFirstRow = false;
+ } else {
+ evaluateBuffer();
+ }
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ if (!abort) {
+ evaluateBuffer();
+ }
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "CCO";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.CORRELATIONCOMPOSITE;
+ }
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.eclipse.jdt.core.dom.ThisExpression;
+
+/**
+ * CorrelationLocalSimulativeReduceSinkOperator simulates a ReduceSinkOperator and sends output to
+ * another operator (JOIN or GBY). CorrelationLocalSimulativeReduceSinkOperator is used only in
+ * reduce phase. Basically, it is a bridge from one JOIN or GBY operator to another JOIN or GBY
+ * operator. A CorrelationLocalSimulativeReduceSinkOperator will take care actions of startGroup and
+ * endGroup of its succeeding JOIN or GBY operator.
+ * Example: A query involves a JOIN operator and a GBY operator and the GBY operator consume the
+ * output of the JOIN operator. In this case, if join keys and group by keys are the same, we do not
+ * need to shuffle the data again, since data has been already partitioned by the JOIN operator.
+ * Thus, in CorrelationOptimizer, the ReduceSinkOperator between JOIN and GBY operator will be
+ * replaced by a CorrelationLocalSimulativeReduceSinkOperator and the JOIN operator and GBY operator
+ * will be executed in a single reduce phase.
+ **/
+public class CorrelationLocalSimulativeReduceSinkOperator
+ extends BaseReduceSinkOperator<CorrelationLocalSimulativeReduceSinkDesc> {
+
+ private static final long serialVersionUID = 1L;
+ protected static final Log LOG = LogFactory.getLog(
+ CorrelationLocalSimulativeReduceSinkOperator.class.getName());
+
+ private transient TableDesc keyTableDesc;
+
+ private transient Deserializer inputKeyDeserializer;
+
+ private transient SerDe inputValueDeserializer;
+
+ private transient ByteWritable tagWritable;
+
+ private transient ObjectInspector outputKeyObjectInspector;
+ private transient ObjectInspector outputValueObjectInspector;
+
+ private List<Object> forwardedRow;
+ private Object keyObject;
+ private Object valueObject;
+
+ private BytesWritable groupKey;
+
+ private static String[] fieldNames;
+
+ static {
+ List<String> fieldNameArray = new ArrayList<String>();
+ for (Utilities.ReduceField r : Utilities.ReduceField.values()) {
+ fieldNameArray.add(r.toString());
+ }
+ fieldNames = fieldNameArray.toArray(new String[0]);
+ }
+
+ public CorrelationLocalSimulativeReduceSinkOperator() {
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ forwardedRow = new ArrayList<Object>(3);
+ tagByte = new byte[1];
+ tagWritable = new ByteWritable();
+ tempInspectableObject = new InspectableObject();
+ keyWritable = new HiveKey();
+ assert childOperatorsArray.length == 1;
+ try {
+ keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
+ int i = 0;
+ for (ExprNodeDesc e : conf.getKeyCols()) {
+ keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ numDistributionKeys = conf.getNumDistributionKeys();
+ distinctColIndices = conf.getDistinctColumnIndices();
+ numDistinctExprs = distinctColIndices.size();
+
+ valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
+ i = 0;
+ for (ExprNodeDesc e : conf.getValueCols()) {
+ valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
+ tag = conf.getTag();
+ tagByte[0] = (byte) tag;
+ tagWritable.set(tagByte[0]);
+ LOG.info("Using tag = " + tag);
+
+ TableDesc keyTableDesc = conf.getKeySerializeInfo();
+ keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+ .newInstance();
+ keySerializer.initialize(null, keyTableDesc.getProperties());
+ keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+
+ inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
+ .getDeserializerClass(), null);
+ inputKeyDeserializer.initialize(null, keyTableDesc.getProperties());
+ outputKeyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+ TableDesc valueTableDesc = conf.getValueSerializeInfo();
+ valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+ .newInstance();
+ valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+ inputValueDeserializer = (SerDe) ReflectionUtils.newInstance(
+ valueTableDesc.getDeserializerClass(), null);
+ inputValueDeserializer.initialize(null, valueTableDesc
+ .getProperties());
+ outputValueObjectInspector = inputValueDeserializer.getObjectInspector();
+
+ ObjectInspector rowInspector = inputObjInspectors[0];
+
+ keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
+ distinctColIndices,
+ conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
+ valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf
+ .getOutputValueColumnNames(), rowInspector);
+ int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
+ int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
+ numDistributionKeys;
+ cachedKeys = new Object[numKeys][keyLen];
+ cachedValues = new Object[valueEval.length];
+ assert cachedKeys.length == 1;
+
+ List<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ ois.add(outputKeyObjectInspector);
+ ois.add(outputValueObjectInspector);
+ ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector);
+
+ outputObjInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois);
+
+ LOG.info("Simulative ReduceSink inputObjInspectors"
+ + ((StructObjectInspector) inputObjInspectors[0]).getTypeName());
+
+ LOG.info("Simulative ReduceSink outputObjInspectors "
+ + this.getChildOperators().get(0).getParentOperators().indexOf(this) +
+ " " + ((StructObjectInspector) outputObjInspector).getTypeName());
+
+ initializeChildren(hconf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ try {
+ // Evaluate the value
+ for (int i = 0; i < valueEval.length; i++) {
+ cachedValues[i] = valueEval[i].evaluate(row);
+ }
+ // Serialize the value
+ value = valueSerializer.serialize(cachedValues, valueObjectInspector);
+ valueObject = inputValueDeserializer.deserialize(value);
+
+ // Evaluate the keys
+ Object[] distributionKeys = new Object[numDistributionKeys];
+ for (int i = 0; i < numDistributionKeys; i++) {
+ distributionKeys[i] = keyEval[i].evaluate(row);
+ }
+
+ if (numDistinctExprs > 0) {
+ // with distinct key(s)
+ for (int i = 0; i < numDistinctExprs; i++) {
+ System.arraycopy(distributionKeys, 0, cachedKeys[i], 0, numDistributionKeys);
+ Object[] distinctParameters =
+ new Object[distinctColIndices.get(i).size()];
+ for (int j = 0; j < distinctParameters.length; j++) {
+ distinctParameters[j] =
+ keyEval[distinctColIndices.get(i).get(j)].evaluate(row);
+ }
+ cachedKeys[i][numDistributionKeys] =
+ new StandardUnion((byte) i, distinctParameters);
+ }
+ } else {
+ // no distinct key
+ System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
+ }
+
+ for (int i = 0; i < cachedKeys.length; i++) {
+ if (keyIsText) {
+ Text key = (Text) keySerializer.serialize(cachedKeys[i],
+ keyObjectInspector);
+ keyWritable.set(key.getBytes(), 0, key.getLength());
+ } else {
+ // Must be BytesWritable
+ BytesWritable key = (BytesWritable) keySerializer.serialize(
+ cachedKeys[i], keyObjectInspector);
+ keyWritable.set(key.getBytes(), 0, key.getLength());
+ }
+
+ if (!keyWritable.equals(groupKey)) {
+ try {
+ keyObject = inputKeyDeserializer.deserialize(keyWritable);
+ } catch (Exception e) {
+ throw new HiveException(
+ "Hive Runtime Error: Unable to deserialize reduce input key from "
+ + Utilities.formatBinaryString(keyWritable.get(), 0,
+ keyWritable.getSize()) + " with properties "
+ + keyTableDesc.getProperties(), e);
+ }
+ if (groupKey == null) { // the first group
+ groupKey = new BytesWritable();
+ } else {
+ // if its child has not been ended, end it
+ if (!keyWritable.equals(childOperatorsArray[0].getBytesWritableGroupKey())) {
+ childOperatorsArray[0].endGroup();
+ }
+ }
+ groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
+ if (!groupKey.equals(childOperatorsArray[0].getBytesWritableGroupKey())) {
+ childOperatorsArray[0].startGroup();
+ childOperatorsArray[0].setGroupKeyObject(keyObject);
+ childOperatorsArray[0].setBytesWritableGroupKey(groupKey);
+ }
+ }
+ forwardedRow.clear();
+ forwardedRow.add(keyObject);
+ forwardedRow.add(valueObject);
+ forwardedRow.add(tagWritable);
+ forward(forwardedRow, outputObjInspector);
+ }
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ }
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ if (!abort) {
+ Operator<? extends OperatorDesc> child = childOperatorsArray[0];
+ if (child.allInitializedParentsAreClosed()) {
+ LOG.info("All parents of " + child.getName() + " (id: " + child.getIdentifier() +
+ ") has been closed. Invoke its endGroup");
+ childOperatorsArray[0].endGroup();
+ }
+ }
+ }
+
+ @Override
+ public void startGroup() throws HiveException {
+ // do nothing
+ }
+
+ @Override
+ public void endGroup() throws HiveException {
+ // do nothing
+ }
+
+ @Override
+ public void setGroupKeyObject(Object keyObject) {
+ // do nothing
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "CLSReduceSink";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.CORRELATIONLOCALSIMULATIVEREDUCESINK;
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,454 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+/**
+ * CorrelationReducerDispatchOperator is an operator used by MapReduce join optimized by
+ * CorrelationOptimizer. If used, CorrelationReducerDispatchOperator is the first operator in reduce
+ * phase. In the case that multiple operation paths are merged into a single one, it will dispatch
+ * the record to corresponding JOIN or GBY operators. Every child of this operator is associated
+ * with a DispatcherHnadler, which evaluates the input row of this operator and then select
+ * corresponding fields for its associated child.
+ */
+public class CorrelationReducerDispatchOperator extends Operator<CorrelationReducerDispatchDesc>
+ implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static String[] fieldNames;
+ static {
+ List<String> fieldNameArray = new ArrayList<String>();
+ for (Utilities.ReduceField r : Utilities.ReduceField.values()) {
+ fieldNameArray.add(r.toString());
+ }
+ fieldNames = fieldNameArray.toArray(new String[0]);
+ }
+
+ protected static class DispatchHandler {
+
+ protected Log l4j = LogFactory.getLog(this.getClass().getName());
+
+ private final ObjectInspector[] inputObjInspector;
+ private ObjectInspector outputObjInspector;
+ private ObjectInspector keyObjInspector;
+ private ObjectInspector valueObjInspector;
+ private final byte inputTag;
+ private final byte outputTag;
+ private final byte childIndx;
+ private final ByteWritable outputTagByteWritable;
+ private final SelectDesc keySelectDesc;
+ private final SelectDesc valueSelectDesc;
+ private ExprNodeEvaluator[] keyEval;
+ private ExprNodeEvaluator[] eval;
+
+ // counters for debugging
+ private transient long cntr = 0;
+ private transient long nextCntr = 1;
+
+ private long getNextCntr(long cntr) {
+ // A very simple counter to keep track of number of rows processed by an
+ // operator. It dumps
+ // every 1 million times, and quickly before that
+ if (cntr >= 1000000) {
+ return cntr + 1000000;
+ }
+ return 10 * cntr;
+ }
+
+ public long getCntr() {
+ return this.cntr;
+ }
+
+ private final Log LOG;
+ private final boolean isLogInfoEnabled;
+ private final String id;
+
+ public DispatchHandler(ObjectInspector[] inputObjInspector, byte inputTag, byte childIndx,
+ byte outputTag,
+ SelectDesc valueSelectDesc, SelectDesc keySelectDesc, Log LOG, String id)
+ throws HiveException {
+ this.inputObjInspector = inputObjInspector;
+ assert this.inputObjInspector.length == 1;
+ this.inputTag = inputTag;
+ this.childIndx = childIndx;
+ this.outputTag = outputTag;
+ this.valueSelectDesc = valueSelectDesc;
+ this.keySelectDesc = keySelectDesc;
+ this.outputTagByteWritable = new ByteWritable(outputTag);
+ this.LOG = LOG;
+ this.isLogInfoEnabled = LOG.isInfoEnabled();
+ this.id = id;
+ init();
+ }
+
+ private void init() throws HiveException {
+ List<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ if (keySelectDesc.isSelStarNoCompute()) {
+ ois.add((ObjectInspector) ((List) inputObjInspector[0]).get(0));
+ } else {
+ List<ExprNodeDesc> colList = this.keySelectDesc.getColList();
+ keyEval = new ExprNodeEvaluator[colList.size()];
+ for (int k = 0; k < colList.size(); k++) {
+ assert (colList.get(k) != null);
+ keyEval[k] = ExprNodeEvaluatorFactory.get(colList.get(k));
+ }
+ keyObjInspector =
+ initEvaluatorsAndReturnStruct(keyEval, keySelectDesc
+ .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0])
+ .getAllStructFieldRefs().get(0).getFieldObjectInspector());
+
+ ois.add(keyObjInspector);
+ l4j.info("Key: input tag " + (int) inputTag + ", output tag " + (int) outputTag
+ + ", SELECT inputOIForThisTag"
+ + ((StructObjectInspector) inputObjInspector[0]).getTypeName());
+ }
+ if (valueSelectDesc.isSelStarNoCompute()) {
+ ois.add((ObjectInspector) ((List) inputObjInspector[0]).get(1));
+ } else {
+ List<ExprNodeDesc> colList = this.valueSelectDesc.getColList();
+ eval = new ExprNodeEvaluator[colList.size()];
+ for (int k = 0; k < colList.size(); k++) {
+ assert (colList.get(k) != null);
+ eval[k] = ExprNodeEvaluatorFactory.get(colList.get(k));
+ }
+ valueObjInspector =
+ initEvaluatorsAndReturnStruct(eval, valueSelectDesc
+ .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0])
+ .getAllStructFieldRefs().get(1).getFieldObjectInspector());
+
+ ois.add(valueObjInspector);
+ l4j.info("input tag " + (int) inputTag + ", output tag " + (int) outputTag
+ + ", SELECT inputOIForThisTag"
+ + ((StructObjectInspector) inputObjInspector[0]).getTypeName());
+ }
+ ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector);
+ outputObjInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois);
+ l4j.info("input tag " + (int) inputTag + ", output tag " + (int) outputTag
+ + ", SELECT outputObjInspector"
+ + ((StructObjectInspector) outputObjInspector).getTypeName());
+ }
+
+ public ObjectInspector getOutputObjInspector() {
+ return outputObjInspector;
+ }
+
+ public Object process(Object row) throws HiveException {
+ List<Object> keyOutput = new ArrayList<Object>(keyEval.length);
+ Object[] valueOutput = new Object[eval.length];
+ List<Object> outputRow = new ArrayList<Object>(3);
+ List thisRow = (List) row;
+ if (keySelectDesc.isSelStarNoCompute()) {
+ outputRow.add(thisRow.get(0));
+ } else {
+ Object key = thisRow.get(0);
+ for (int j = 0; j < keyEval.length; j++) {
+ try {
+ keyOutput.add(keyEval[j].evaluate(key));
+ } catch (HiveException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw new HiveException("Error evaluating "
+ + keySelectDesc.getColList().get(j).getExprString(), e);
+ }
+ }
+ outputRow.add(keyOutput);
+ }
+
+ if (valueSelectDesc.isSelStarNoCompute()) {
+ outputRow.add(thisRow.get(1));
+ } else {
+ Object value = thisRow.get(1);
+ for (int j = 0; j < eval.length; j++) {
+ try {
+ valueOutput[j] = eval[j].evaluate(value);
+ } catch (HiveException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw new HiveException("Error evaluating "
+ + valueSelectDesc.getColList().get(j).getExprString(), e);
+ }
+ }
+ outputRow.add(valueOutput);
+ }
+ outputRow.add(outputTagByteWritable);
+
+ if (isLogInfoEnabled) {
+ cntr++;
+ if (cntr == nextCntr) {
+ LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", "
+ + outputTag + "), forwarding " + cntr + " rows");
+ nextCntr = getNextCntr(cntr);
+ }
+ }
+
+ return outputRow;
+ }
+
+ public void printCloseOpLog() {
+ LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", "
+ + outputTag + "), forwarded " + cntr + " rows");
+ }
+ }
+
+ // inputTag->(Child->List<outputTag>)
+ private Map<Integer, Map<Integer, List<Integer>>> dispatchConf;
+ // inputTag->(Child->List<SelectDesc>)
+ private Map<Integer, Map<Integer, List<SelectDesc>>> dispatchValueSelectDescConf;
+ // inputTag->(Child->List<SelectDesc>)
+ private Map<Integer, Map<Integer, List<SelectDesc>>> dispatchKeySelectDescConf;
+ // inputTag->(Child->List<DispatchHandler>)
+ private Map<Integer, Map<Integer, List<DispatchHandler>>> dispatchHandlers;
+ // Child->(outputTag->DispatchHandler)
+ private Map<Integer, Map<Integer, DispatchHandler>> child2OutputTag2DispatchHandlers;
+ // Child->Child's inputObjInspectors
+ private Map<Integer, ObjectInspector[]> childInputObjInspectors;
+
+ private int operationPathTag;
+ private int inputTag;
+
+ private Object[] lastDispatchedRows;
+ private int[] lastDispatchedTags;
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ dispatchConf = conf.getDispatchConf();
+ dispatchValueSelectDescConf = conf.getDispatchValueSelectDescConf();
+ dispatchKeySelectDescConf = conf.getDispatchKeySelectDescConf();
+ dispatchHandlers = new HashMap<Integer, Map<Integer, List<DispatchHandler>>>();
+ for (Entry<Integer, Map<Integer, List<Integer>>> entry : dispatchConf.entrySet()) {
+ Map<Integer, List<DispatchHandler>> tmp =
+ new HashMap<Integer, List<DispatchHandler>>();
+ for (Entry<Integer, List<Integer>> child2outputTag : entry.getValue().entrySet()) {
+ tmp.put(child2outputTag.getKey(), new ArrayList<DispatchHandler>());
+ int indx = 0;
+ for (Integer outputTag : child2outputTag.getValue()) {
+ ObjectInspector[] thisInputObjectInspector =
+ new ObjectInspector[] {inputObjInspectors[entry.getKey()]};
+ Integer thisInputTag = entry.getKey();
+ Integer thisChildIndx = child2outputTag.getKey();
+ SelectDesc thisValueSelectDesc = dispatchValueSelectDescConf.get(thisInputTag)
+ .get(thisChildIndx).get(indx);
+ SelectDesc thisKeySelectDesc = dispatchKeySelectDescConf.get(thisInputTag)
+ .get(thisChildIndx).get(indx);
+ tmp.get(child2outputTag.getKey()).add(
+ new DispatchHandler(thisInputObjectInspector,
+ thisInputTag.byteValue(), thisChildIndx.byteValue(), outputTag.byteValue(),
+ thisValueSelectDesc, thisKeySelectDesc, LOG, id));
+ indx++;
+ }
+ }
+ dispatchHandlers.put(entry.getKey(), tmp);
+ }
+
+ child2OutputTag2DispatchHandlers = new HashMap<Integer, Map<Integer, DispatchHandler>>();
+ for (Entry<Integer, Map<Integer, List<Integer>>> entry : dispatchConf.entrySet()) {
+ for (Entry<Integer, List<Integer>> child2outputTag : entry.getValue().entrySet()) {
+ if (!child2OutputTag2DispatchHandlers.containsKey(child2outputTag.getKey())) {
+ child2OutputTag2DispatchHandlers.put(child2outputTag.getKey(),
+ new HashMap<Integer, DispatchHandler>());
+ }
+ int indx = 0;
+ for (Integer outputTag : child2outputTag.getValue()) {
+ child2OutputTag2DispatchHandlers.get(child2outputTag.getKey()).
+ put(outputTag,
+ dispatchHandlers.get(entry.getKey()).get(child2outputTag.getKey()).get(indx));
+ indx++;
+ }
+ }
+ }
+
+ childInputObjInspectors = new HashMap<Integer, ObjectInspector[]>();
+ for (Entry<Integer, Map<Integer, DispatchHandler>> entry : child2OutputTag2DispatchHandlers
+ .entrySet()) {
+ Integer l = Collections.max(entry.getValue().keySet());
+ ObjectInspector[] childObjInspectors = new ObjectInspector[l.intValue() + 1];
+ for (Entry<Integer, DispatchHandler> e : entry.getValue().entrySet()) {
+ if (e.getKey().intValue() == -1) {
+ assert childObjInspectors.length == 1;
+ childObjInspectors[0] = e.getValue().getOutputObjInspector();
+ } else {
+ childObjInspectors[e.getKey().intValue()] = e.getValue().getOutputObjInspector();
+ }
+ }
+ childInputObjInspectors.put(entry.getKey(), childObjInspectors);
+ }
+
+ lastDispatchedRows = new Object[childOperatorsArray.length];
+ lastDispatchedTags = new int[childOperatorsArray.length];
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ lastDispatchedRows[i] = null;
+ lastDispatchedTags[i] = -1;
+ }
+
+ initializeChildren(hconf);
+ }
+
+ // Each child should has its own outputObjInspector
+ @Override
+ protected void initializeChildren(Configuration hconf) throws HiveException {
+ state = State.INIT;
+ LOG.info("Operator " + id + " " + getName() + " initialized");
+ if (childOperators == null) {
+ return;
+ }
+ LOG.info("Initializing children of " + id + " " + getName());
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ LOG.info("Initializing child " + i + " " + childOperatorsArray[i].getIdentifier() + " " +
+ childOperatorsArray[i].getName() +
+ " " + childInputObjInspectors.get(i).length);
+ childOperatorsArray[i].initialize(hconf, childInputObjInspectors.get(i));
+ if (reporter != null) {
+ childOperatorsArray[i].setReporter(reporter);
+ }
+ }
+ }
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ List<Object> thisRow = (List<Object>) row;
+ assert thisRow.size() == 4;
+ operationPathTag = ((ByteWritable) thisRow.get(3)).get();
+ inputTag = ((ByteWritable) thisRow.get(2)).get();
+ forward(thisRow.subList(0, 3), inputObjInspectors[inputTag]);
+ }
+
+ @Override
+ public void forward(Object row, ObjectInspector rowInspector)
+ throws HiveException {
+ if ((++outputRows % 1000) == 0) {
+ if (counterNameToEnum != null) {
+ incrCounter(numOutputRowsCntr, outputRows);
+ outputRows = 0;
+ }
+ }
+
+ if (childOperatorsArray == null && childOperators != null) {
+ throw new HiveException("Internal Hive error during operator initialization.");
+ }
+
+ if ((childOperatorsArray == null) || (getDone())) {
+ return;
+ }
+
+ int childrenDone = 0;
+ int forwardFlag = 1;
+ assert childOperatorsArray.length <= 8;
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ Operator<? extends OperatorDesc> o = childOperatorsArray[i];
+ if (o.getDone()) {
+ childrenDone++;
+ } else {
+ int isProcess = (operationPathTag & (forwardFlag << i));
+ if (isProcess != 0) {
+ if (o.getName().equals(GroupByOperator.getOperatorName())) {
+ GroupByOperator gbyop = (GroupByOperator) o;
+ gbyop.setForcedForward(false);
+ if (!this.bytesWritableGroupKey.equals(o.getBytesWritableGroupKey())) {
+ o.setBytesWritableGroupKey(this.bytesWritableGroupKey);
+ }
+ }
+ for (int j = 0; j < dispatchHandlers.get(inputTag).get(i).size(); j++) {
+ Object dispatchedRow = dispatchHandlers.get(inputTag).get(i).get(j).process(row);
+ int dispatchedTag = dispatchConf.get(inputTag).get(i).get(j);
+ o.process(dispatchedRow, dispatchedTag);
+ lastDispatchedRows[i] = dispatchedRow;
+ lastDispatchedTags[i] = dispatchedTag;
+ }
+ }
+ if (isProcess == 0 && o.getName().equals(GroupByOperator.getOperatorName())) {
+ if (lastDispatchedRows[i] != null &&
+ !this.bytesWritableGroupKey.equals(o.getBytesWritableGroupKey())) {
+ GroupByOperator gbyop = (GroupByOperator) o;
+ gbyop.setForcedForward(true);
+ o.setBytesWritableGroupKey(this.bytesWritableGroupKey);
+ o.process(lastDispatchedRows[i], lastDispatchedTags[i]);
+ }
+ }
+ }
+ }
+
+ // if all children are done, this operator is also done
+ if (childrenDone == childOperatorsArray.length) {
+ setDone(true);
+ }
+ }
+
+ @Override
+ protected void closeOp(boolean abort) throws HiveException {
+ // log the number of rows forwarded from each dispatcherHandler
+ for (Map<Integer, List<DispatchHandler>> childIndx2DispatchHandlers : dispatchHandlers
+ .values()) {
+ for (List<DispatchHandler> dispatchHandlers : childIndx2DispatchHandlers.values()) {
+ for (DispatchHandler dispatchHandler : dispatchHandlers) {
+ dispatchHandler.printCloseOpLog();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void setGroupKeyObject(Object keyObject) {
+ this.groupKeyObject = keyObject;
+ for (Operator<? extends OperatorDesc> op : childOperators) {
+ op.setGroupKeyObject(keyObject);
+ }
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "CDP";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.CORRELATIONREDUCERDISPATCH;
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Sun Sep 30 20:41:01 2012
@@ -25,6 +25,7 @@ import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -61,6 +62,7 @@ public class ExecReducer extends MapRedu
private Reporter rp;
private boolean abort = false;
private boolean isTagged = false;
+ private boolean isOperationPathTagged = false;
private long cntr = 0;
private long nextCntr = 1;
@@ -116,6 +118,7 @@ public class ExecReducer extends MapRedu
reducer.setParentOperators(null); // clear out any parents as reducer is the
// root
isTagged = gWork.getNeedsTagging();
+ isOperationPathTagged = gWork.getNeedsOperationPathTagging();
try {
keyTableDesc = gWork.getKeyDesc();
inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
@@ -164,8 +167,9 @@ public class ExecReducer extends MapRedu
private BytesWritable groupKey;
- ArrayList<Object> row = new ArrayList<Object>(3);
+ List<Object> row = new ArrayList<Object>(4);
ByteWritable tag = new ByteWritable();
+ ByteWritable operationPathTags = new ByteWritable();
public void reduce(Object key, Iterator values, OutputCollector output,
Reporter reporter) throws IOException {
@@ -188,6 +192,14 @@ public class ExecReducer extends MapRedu
keyWritable.setSize(size);
}
+ operationPathTags.set((byte)0);
+ if (isOperationPathTagged) {
+ // remove the operation plan tag
+ int size = keyWritable.getSize() - 1;
+ operationPathTags.set(keyWritable.get()[size]);
+ keyWritable.setSize(size);
+ }
+
if (!keyWritable.equals(groupKey)) {
// If a operator wants to do some work at the beginning of a group
if (groupKey == null) { // the first group
@@ -212,6 +224,7 @@ public class ExecReducer extends MapRedu
l4j.trace("Start Group");
reducer.startGroup();
reducer.setGroupKeyObject(keyObject);
+ reducer.setBytesWritableGroupKey(groupKey);
}
// System.err.print(keyObject.toString());
while (values.hasNext()) {
@@ -234,6 +247,7 @@ public class ExecReducer extends MapRedu
row.add(valueObject[tag.get()]);
// The tag is not used any more, we should remove it.
row.add(tag);
+ row.add(operationPathTags);
if (isLogInfoEnabled) {
cntr++;
if (cntr == nextCntr) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Sun Sep 30 20:41:01 2012
@@ -144,6 +144,13 @@ public class GroupByOperator extends Ope
private long maxMemory;
private float memoryThreshold;
+ private boolean forcedForward; // only used by CorrelationReducerDispatchOperator to make
+ // GroupByOperator has the same pace with other
+ // GroupByOperators and JoinOperators.
+ // If true and newKeys is different from currentKeys,
+ // data associated with currentKeys will be
+ // forwarded, otherwise, nothing happens.
+
/**
* This is used to store the position and field names for variable length
* fields.
@@ -385,6 +392,7 @@ public class GroupByOperator extends Ope
memoryMXBean = ManagementFactory.getMemoryMXBean();
maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
memoryThreshold = this.getConf().getMemoryThreshold();
+ forcedForward = false;
initializeChildren(hconf);
}
@@ -793,6 +801,10 @@ public class GroupByOperator extends Ope
}
}
+ public void setForcedForward(boolean forcedForward) {
+ this.forcedForward = forcedForward;
+ }
+
// Non-hash aggregation
private void processAggr(Object row, ObjectInspector rowInspector,
KeyWrapper newKeys) throws HiveException {
@@ -806,11 +818,16 @@ public class GroupByOperator extends Ope
newKeys.equals(currentKeys) : false;
// Forward the current keys if needed for sort-based aggregation
- if (currentKeys != null && !keysAreEqual) {
+ if (currentKeys != null && (!keysAreEqual || forcedForward)) {
forward(currentKeys.getKeyArray(), aggregations);
countAfterReport = 0;
}
+ if (forcedForward) {
+ currentKeys = null;
+ return;
+ }
+
// Need to update the keys?
if (currentKeys == null || !keysAreEqual) {
if (currentKeys == null) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Sun Sep 30 20:41:01 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.plan.ap
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.OutputCollector;
@@ -1415,4 +1416,52 @@ public abstract class Operator<T extends
public void setUseBucketizedHiveInputFormat(boolean useBucketizedHiveInputFormat) {
this.useBucketizedHiveInputFormat = useBucketizedHiveInputFormat;
}
+
+ // bytesWritableGroupKey is only used when a query plan is optimized by CorrelationOptimizer.
+ // CorrelationLocalSimulativeReduceSinkOperator will use this variable to determine when it needs to start or end the group
+ // for its child operator.
+ protected BytesWritable bytesWritableGroupKey;
+
+ public void setBytesWritableGroupKey(BytesWritable groupKey) {
+ if (bytesWritableGroupKey == null) {
+ bytesWritableGroupKey = new BytesWritable();
+ }
+ bytesWritableGroupKey.set(groupKey.get(), 0, groupKey.getSize());
+ }
+
+ public BytesWritable getBytesWritableGroupKey() {
+ return bytesWritableGroupKey;
+ }
+
+ // The number of current row
+ protected long rowNumber;
+
+ public void initializeRowNumber() {
+ this.rowNumber = 0L;
+ LOG.info("Operator " + id + " " + getName() + " row number initialized to 0");
+ if (childOperators == null) {
+ return;
+ }
+ LOG.info("Initializing row numbers of children of " + id + " " + getName());
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ childOperatorsArray[i].initializeRowNumber();
+ }
+ }
+
+ public void setRowNumber(long rowNumber) throws HiveException {
+ this.rowNumber = rowNumber;
+ if (childOperators == null) {
+ return;
+ }
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ assert rowNumber >= childOperatorsArray[i].getRowNumber();
+ if (rowNumber != childOperatorsArray[i].getRowNumber()) {
+ childOperatorsArray[i].setRowNumber(rowNumber);
+ }
+ }
+ }
+
+ public long getRowNumber() {
+ return rowNumber;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Sun Sep 30 20:41:01 2012
@@ -22,6 +22,9 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
@@ -91,6 +94,12 @@ public final class OperatorFactory {
HashTableDummyOperator.class));
opvec.add(new OpTuple<HashTableSinkDesc>(HashTableSinkDesc.class,
HashTableSinkOperator.class));
+ opvec.add(new OpTuple<CorrelationCompositeDesc>(CorrelationCompositeDesc.class,
+ CorrelationCompositeOperator.class));
+ opvec.add(new OpTuple<CorrelationReducerDispatchDesc>(CorrelationReducerDispatchDesc.class,
+ CorrelationReducerDispatchOperator.class));
+ opvec.add(new OpTuple<CorrelationLocalSimulativeReduceSinkDesc>(CorrelationLocalSimulativeReduceSinkDesc.class,
+ CorrelationLocalSimulativeReduceSinkOperator.class));
}
public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Sun Sep 30 20:41:01 2012
@@ -21,179 +21,50 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Random;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.Serializer;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
/**
* Reduce Sink Operator sends output to the reduce stage.
**/
-public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
+public class ReduceSinkOperator extends BaseReduceSinkOperator<ReduceSinkDesc>
implements Serializable {
private static final long serialVersionUID = 1L;
- /**
- * The evaluators for the key columns. Key columns decide the sort order on
- * the reducer side. Key columns are passed to the reducer in the "key".
- */
- protected transient ExprNodeEvaluator[] keyEval;
- /**
- * The evaluators for the value columns. Value columns are passed to reducer
- * in the "value".
- */
- protected transient ExprNodeEvaluator[] valueEval;
- /**
- * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
- * Hive language). Partition columns decide the reducer that the current row
- * goes to. Partition columns are not passed to reducer.
- */
- protected transient ExprNodeEvaluator[] partitionEval;
-
- // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
- // ready
- transient Serializer keySerializer;
- transient boolean keyIsText;
- transient Serializer valueSerializer;
- transient int tag;
- transient byte[] tagByte = new byte[1];
- transient protected int numDistributionKeys;
- transient protected int numDistinctExprs;
-
- @Override
- protected void initializeOp(Configuration hconf) throws HiveException {
-
- try {
- keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
- int i = 0;
- for (ExprNodeDesc e : conf.getKeyCols()) {
- keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
-
- numDistributionKeys = conf.getNumDistributionKeys();
- distinctColIndices = conf.getDistinctColumnIndices();
- numDistinctExprs = distinctColIndices.size();
-
- valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
- i = 0;
- for (ExprNodeDesc e : conf.getValueCols()) {
- valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
-
- partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
- i = 0;
- for (ExprNodeDesc e : conf.getPartitionCols()) {
- partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
+ private final List<Integer> operationPathTags = new ArrayList<Integer>(); // operation path tags
+ private final byte[] operationPathTagsByte = new byte[1];
- tag = conf.getTag();
- tagByte[0] = (byte) tag;
- LOG.info("Using tag = " + tag);
-
- TableDesc keyTableDesc = conf.getKeySerializeInfo();
- keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
- .newInstance();
- keySerializer.initialize(null, keyTableDesc.getProperties());
- keyIsText = keySerializer.getSerializedClass().equals(Text.class);
-
- TableDesc valueTableDesc = conf.getValueSerializeInfo();
- valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
- .newInstance();
- valueSerializer.initialize(null, valueTableDesc.getProperties());
-
- firstRow = true;
- initializeChildren(hconf);
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
+ public void setOperationPathTags(List<Integer> operationPathTags) {
+ this.operationPathTags.addAll(operationPathTags);
+ int operationPathTagsInt = 0;
+ int tmp = 1;
+ for (Integer operationPathTag: operationPathTags) {
+ operationPathTagsInt += tmp << operationPathTag.intValue();
}
+ operationPathTagsByte[0] = (byte) operationPathTagsInt;
}
- transient InspectableObject tempInspectableObject = new InspectableObject();
- transient HiveKey keyWritable = new HiveKey();
- transient Writable value;
-
- transient StructObjectInspector keyObjectInspector;
- transient StructObjectInspector valueObjectInspector;
- transient ObjectInspector[] partitionObjectInspectors;
-
- transient Object[][] cachedKeys;
- transient Object[] cachedValues;
- transient List<List<Integer>> distinctColIndices;
-
- boolean firstRow;
-
- transient Random random;
-
- /**
- * Initializes array of ExprNodeEvaluator. Adds Union field for distinct
- * column indices for group by.
- * Puts the return values into a StructObjectInspector with output column
- * names.
- *
- * If distinctColIndices is empty, the object inspector is same as
- * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)}
- */
- protected static StructObjectInspector initEvaluatorsAndReturnStruct(
- ExprNodeEvaluator[] evals, List<List<Integer>> distinctColIndices,
- List<String> outputColNames,
- int length, ObjectInspector rowInspector)
- throws HiveException {
- int inspectorLen = evals.length > length ? length + 1 : evals.length;
- List<ObjectInspector> sois = new ArrayList<ObjectInspector>(inspectorLen);
-
- // keys
- ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector);
- sois.addAll(Arrays.asList(fieldObjectInspectors));
-
- if (evals.length > length) {
- // union keys
- List<ObjectInspector> uois = new ArrayList<ObjectInspector>();
- for (List<Integer> distinctCols : distinctColIndices) {
- List<String> names = new ArrayList<String>();
- List<ObjectInspector> eois = new ArrayList<ObjectInspector>();
- int numExprs = 0;
- for (int i : distinctCols) {
- names.add(HiveConf.getColumnInternalName(numExprs));
- eois.add(evals[i].initialize(rowInspector));
- numExprs++;
- }
- uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois));
- }
- UnionObjectInspector uoi =
- ObjectInspectorFactory.getStandardUnionObjectInspector(uois);
- sois.add(uoi);
- }
- return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois );
+ public List<Integer> getOperationPathTags() {
+ return this.operationPathTags;
}
@Override
public void processOp(Object row, int tag) throws HiveException {
try {
ObjectInspector rowInspector = inputObjInspectors[tag];
- if (firstRow) {
- firstRow = false;
+ if (isFirstRow) {
+ isFirstRow = false;
keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
distinctColIndices,
conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
@@ -267,9 +138,18 @@ public class ReduceSinkOperator extends
keyWritable.set(key.getBytes(), 0, key.getLength());
} else {
int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.setSize(keyLength + 1);
+ } else {
+ keyWritable.setSize(keyLength + 2);
+ }
System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.get()[keyLength] = tagByte[0];
+ } else {
+ keyWritable.get()[keyLength] = operationPathTagsByte[0];
+ keyWritable.get()[keyLength + 1] = tagByte[0];
+ }
}
} else {
// Must be BytesWritable
@@ -279,9 +159,18 @@ public class ReduceSinkOperator extends
keyWritable.set(key.getBytes(), 0, key.getLength());
} else {
int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.setSize(keyLength + 1);
+ } else {
+ keyWritable.setSize(keyLength + 2);
+ }
System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
+ if (!this.getConf().getNeedsOperationPathTagging()) {
+ keyWritable.get()[keyLength] = tagByte[0];
+ } else {
+ keyWritable.get()[keyLength] = operationPathTagsByte[0];
+ keyWritable.get()[keyLength + 1] = tagByte[0];
+ }
}
}
keyWritable.setHashCode(keyHashCode);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Sun Sep 30 20:41:01 2012
@@ -80,6 +80,9 @@ public class TableScanOperator extends O
if (conf != null && conf.isGatherStats()) {
gatherStats(row);
}
+ if (conf != null && conf.isForwardRowNumber()) {
+ setRowNumber(rowNumber+1);
+ }
forward(row, inputObjInspectors[tag]);
}
@@ -169,6 +172,12 @@ public class TableScanOperator extends O
if (conf == null) {
return;
}
+
+ LOG.info(this.getName() + " forward row number " + conf.isForwardRowNumber());
+ if(conf.isForwardRowNumber()){
+ initializeRowNumber();
+ }
+
if (!conf.isGatherStats()) {
return;
}