You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2015/01/06 19:45:11 UTC

svn commit: r1649899 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/spark/SparkReduceRecordHandler.java optimizer/physical/Vectorizer.java

Author: brock
Date: Tue Jan  6 18:45:10 2015
New Revision: 1649899

URL: http://svn.apache.org/r1649899
Log:
HIVE-9246 - Remove tabs from spark code [Spark Branch] (Brock reviewed by Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java?rev=1649899&r1=1649898&r2=1649899&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java Tue Jan  6 18:45:10 2015
@@ -136,65 +136,65 @@ public class SparkReduceRecordHandler ex
       valueTableDesc = new TableDesc[gWork.getTagToValueDesc().size()];
 
       if (vectorized) {
-	final int maxTags = gWork.getTagToValueDesc().size();
-	keyStructInspector = (StructObjectInspector) keyObjectInspector;
-	batches = new VectorizedRowBatch[maxTags];
-	valueStructInspectors = new StructObjectInspector[maxTags];
-	valueStringWriters = (List<VectorExpressionWriter>[]) new List[maxTags];
-	keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
-	buffer = new DataOutputBuffer();
+        final int maxTags = gWork.getTagToValueDesc().size();
+        keyStructInspector = (StructObjectInspector) keyObjectInspector;
+        batches = new VectorizedRowBatch[maxTags];
+        valueStructInspectors = new StructObjectInspector[maxTags];
+        valueStringWriters = (List<VectorExpressionWriter>[]) new List[maxTags];
+        keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+        buffer = new DataOutputBuffer();
       }
 
       for (int tag = 0; tag < gWork.getTagToValueDesc().size(); tag++) {
-	// We should initialize the SerDe with the TypeInfo when available.
-	valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag);
-	inputValueDeserializer[tag] = ReflectionUtils.newInstance(
-	    valueTableDesc[tag].getDeserializerClass(), null);
-	SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null,
-	    valueTableDesc[tag].getProperties(), null);
-	valueObjectInspector[tag] = inputValueDeserializer[tag].getObjectInspector();
-
-	ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
-
-	if (vectorized) {
-	  /* vectorization only works with struct object inspectors */
-	  valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag];
-
-	  batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector,
-	      valueStructInspectors[tag]);
-	  final int totalColumns = keysColumnOffset
-	      + valueStructInspectors[tag].getAllStructFieldRefs().size();
-	  valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns);
-	  valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory
-	      .genVectorStructExpressionWritables(keyStructInspector)));
-	  valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory
-	      .genVectorStructExpressionWritables(valueStructInspectors[tag])));
-
-	  /*
-	   * The row object inspector used by ReduceWork needs to be a
-	   * **standard** struct object inspector, not just any struct object
-	   * inspector.
-	   */
-	  ArrayList<String> colNames = new ArrayList<String>();
-	  List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs();
-	  for (StructField field : fields) {
-	    colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
-	    ois.add(field.getFieldObjectInspector());
-	  }
-	  fields = valueStructInspectors[tag].getAllStructFieldRefs();
-	  for (StructField field : fields) {
-	    colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
-	    ois.add(field.getFieldObjectInspector());
-	  }
-	  rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
-	      colNames, ois);
-	} else {
-	  ois.add(keyObjectInspector);
-	  ois.add(valueObjectInspector[tag]);
-	  //reducer.setGroupKeyObjectInspector(keyObjectInspector);
-	  rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
-	      Utilities.reduceFieldNameList, ois);
-	}
+        // We should initialize the SerDe with the TypeInfo when available.
+        valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag);
+        inputValueDeserializer[tag] = ReflectionUtils.newInstance(
+            valueTableDesc[tag].getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null,
+            valueTableDesc[tag].getProperties(), null);
+        valueObjectInspector[tag] = inputValueDeserializer[tag].getObjectInspector();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+
+        if (vectorized) {
+          /* vectorization only works with struct object inspectors */
+          valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag];
+
+          batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector,
+              valueStructInspectors[tag]);
+          final int totalColumns = keysColumnOffset
+              + valueStructInspectors[tag].getAllStructFieldRefs().size();
+          valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns);
+          valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory
+              .genVectorStructExpressionWritables(keyStructInspector)));
+          valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory
+              .genVectorStructExpressionWritables(valueStructInspectors[tag])));
+
+          /*
+           * The row object inspector used by ReduceWork needs to be a
+           * **standard** struct object inspector, not just any struct object
+           * inspector.
+           */
+          ArrayList<String> colNames = new ArrayList<String>();
+          List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs();
+          for (StructField field : fields) {
+            colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
+            ois.add(field.getFieldObjectInspector());
+          }
+          fields = valueStructInspectors[tag].getAllStructFieldRefs();
+          for (StructField field : fields) {
+            colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
+            ois.add(field.getFieldObjectInspector());
+          }
+          rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
+              colNames, ois);
+        } else {
+          ois.add(keyObjectInspector);
+          ois.add(valueObjectInspector[tag]);
+          //reducer.setGroupKeyObjectInspector(keyObjectInspector);
+          rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
+              Utilities.reduceFieldNameList, ois);
+        }
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -283,19 +283,20 @@ public class SparkReduceRecordHandler ex
       }
       /* this.keyObject passed via reference */
       if (vectorized) {
-	processVectors(values, tag);
+        processVectors(values, tag);
       } else {
-	processKeyValues(values, tag);
+        processKeyValues(values, tag);
       }
 
     } catch (Throwable e) {
       abort = true;
       if (e instanceof OutOfMemoryError) {
-	// Don't create a new object if we are already out of memory
-	throw (OutOfMemoryError) e;
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
       } else {
-	LOG.fatal(StringUtils.stringifyException(e));
-	throw new RuntimeException(e);
+        String msg = "Fatal error: " + e;
+        LOG.fatal(msg, e);
+        throw new RuntimeException(e);
       }
     }
   }
@@ -305,43 +306,39 @@ public class SparkReduceRecordHandler ex
    * @return true if it is not done and can take more inputs
    */
   private boolean processKeyValues(Iterator values, byte tag) throws HiveException {
-
-	// System.err.print(keyObject.toString());
-      while (values.hasNext()) {
-        BytesWritable valueWritable = (BytesWritable) values.next();
-        // System.err.print(who.getHo().toString());
-        try {
-          valueObject[tag] = inputValueDeserializer[tag].deserialize(valueWritable);
-        } catch (SerDeException e) {
-          throw new HiveException(
-            "Hive Runtime Error: Unable to deserialize reduce input value (tag="
-              + tag
-              + ") from "
-              + Utilities.formatBinaryString(valueWritable.get(), 0,
-              valueWritable.getSize()) + " with properties "
-              + valueTableDesc[tag].getProperties(), e);
-        }
-        row.clear();
-        row.add(keyObject);
-        row.add(valueObject[tag]);
-        if (isLogInfoEnabled) {
-            logMemoryInfo();
-        }
+    while (values.hasNext()) {
+      BytesWritable valueWritable = (BytesWritable) values.next();
+      try {
+        valueObject[tag] = inputValueDeserializer[tag].deserialize(valueWritable);
+      } catch (SerDeException e) {
+        throw new HiveException(
+          "Hive Runtime Error: Unable to deserialize reduce input value (tag="
+            + tag
+            + ") from "
+            + Utilities.formatBinaryString(valueWritable.get(), 0,
+            valueWritable.getSize()) + " with properties "
+            + valueTableDesc[tag].getProperties(), e);
+      }
+      row.clear();
+      row.add(keyObject);
+      row.add(valueObject[tag]);
+      if (isLogInfoEnabled) {
+        logMemoryInfo();
+      }
+      try {
+        reducer.processOp(row, tag);
+      } catch (Exception e) {
+        String rowString = null;
         try {
-          reducer.processOp(row, tag);
-        } catch (Exception e) {
-          String rowString = null;
-          try {
-            rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]);
-          } catch (Exception e2) {
-            rowString = "[Error getting row data with exception " +
-              StringUtils.stringifyException(e2) + " ]";
-          }
-          throw new HiveException("Hive Runtime Error while processing row (tag="
-            + tag + ") " + rowString, e);
+          rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]);
+        } catch (Exception e2) {
+          rowString = "[Error getting row data with exception " +
+            StringUtils.stringifyException(e2) + " ]";
         }
+        throw new HiveException("Hive Runtime Error while processing row (tag="
+          + tag + ") " + rowString, e);
       }
-
+    }
     return true; // give me more
   }
 
@@ -362,41 +359,41 @@ public class SparkReduceRecordHandler ex
     int rowIdx = 0;
     try {
       while (values.hasNext()) {
-	/* deserialize value into columns */
-	BytesWritable valueWritable = (BytesWritable) values.next();
-	Object valueObj = deserializeValue(valueWritable, tag);
-
-	VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], rowIdx,
-	    keysColumnOffset, batch, buffer);
-	rowIdx++;
-	if (rowIdx >= BATCH_SIZE) {
-	  VectorizedBatchUtil.setBatchSize(batch, rowIdx);
-	  reducer.processOp(batch, tag);
-	  rowIdx = 0;
-	  if (isLogInfoEnabled) {
-	    logMemoryInfo();
-	  }
-	}
+        /* deserialize value into columns */
+        BytesWritable valueWritable = (BytesWritable) values.next();
+        Object valueObj = deserializeValue(valueWritable, tag);
+
+        VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], rowIdx,
+            keysColumnOffset, batch, buffer);
+        rowIdx++;
+        if (rowIdx >= BATCH_SIZE) {
+          VectorizedBatchUtil.setBatchSize(batch, rowIdx);
+          reducer.processOp(batch, tag);
+          rowIdx = 0;
+          if (isLogInfoEnabled) {
+            logMemoryInfo();
+          }
+        }
       }
       if (rowIdx > 0) {
-	VectorizedBatchUtil.setBatchSize(batch, rowIdx);
-	reducer.processOp(batch, tag);
+        VectorizedBatchUtil.setBatchSize(batch, rowIdx);
+        reducer.processOp(batch, tag);
       }
       if (isLogInfoEnabled) {
-	logMemoryInfo();
+        logMemoryInfo();
       }
     } catch (Exception e) {
       String rowString = null;
       try {
-	/* batch.toString depends on this */
-	batch.setValueWriters(valueStringWriters[tag].toArray(new VectorExpressionWriter[0]));
-	rowString = batch.toString();
+        /* batch.toString depends on this */
+        batch.setValueWriters(valueStringWriters[tag].toArray(new VectorExpressionWriter[0]));
+        rowString = batch.toString();
       } catch (Exception e2) {
-	rowString = "[Error getting row data with exception " + StringUtils.stringifyException(e2)
-	    + " ]";
+        rowString = "[Error getting row data with exception " + StringUtils.stringifyException(e2)
+          + " ]";
       }
       throw new HiveException("Hive Runtime Error while processing vector batch (tag=" + tag + ") "
-	  + rowString, e);
+        + rowString, e);
     }
     return true; // give me more
   }
@@ -406,9 +403,9 @@ public class SparkReduceRecordHandler ex
       return inputValueDeserializer[tag].deserialize(valueWritable);
     } catch (SerDeException e) {
       throw new HiveException("Hive Runtime Error: Unable to deserialize reduce input value (tag="
-	  + tag + ") from "
-	  + Utilities.formatBinaryString(valueWritable.getBytes(), 0, valueWritable.getLength())
-	  + " with properties " + valueTableDesc[tag].getProperties(), e);
+        + tag + ") from "
+        + Utilities.formatBinaryString(valueWritable.getBytes(), 0, valueWritable.getLength())
+        + " with properties " + valueTableDesc[tag].getProperties(), e);
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1649899&r1=1649898&r2=1649899&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Tue Jan  6 18:45:10 2015
@@ -313,14 +313,14 @@ public class Vectorizer implements Physi
           }
         }
       } else if (currTask instanceof SparkTask) {
-	SparkWork sparkWork = (SparkWork) currTask.getWork();
-	for (BaseWork baseWork : sparkWork.getAllWork()) {
-	  if (baseWork instanceof MapWork) {
-	    convertMapWork((MapWork) baseWork, false);
-	  } else if (baseWork instanceof ReduceWork) {
-	    convertReduceWork((ReduceWork) baseWork);
-	  }
-	}
+        SparkWork sparkWork = (SparkWork) currTask.getWork();
+        for (BaseWork baseWork : sparkWork.getAllWork()) {
+          if (baseWork instanceof MapWork) {
+            convertMapWork((MapWork) baseWork, false);
+          } else if (baseWork instanceof ReduceWork) {
+            convertReduceWork((ReduceWork) baseWork);
+          }
+        }
       }
       return null;
     }