You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2015/01/06 19:45:11 UTC
svn commit: r1649899 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql:
exec/spark/SparkReduceRecordHandler.java optimizer/physical/Vectorizer.java
Author: brock
Date: Tue Jan 6 18:45:10 2015
New Revision: 1649899
URL: http://svn.apache.org/r1649899
Log:
HIVE-9246 - Remove tabs from spark code [Spark Branch] (Brock reviewed by Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java?rev=1649899&r1=1649898&r2=1649899&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java Tue Jan 6 18:45:10 2015
@@ -136,65 +136,65 @@ public class SparkReduceRecordHandler ex
valueTableDesc = new TableDesc[gWork.getTagToValueDesc().size()];
if (vectorized) {
- final int maxTags = gWork.getTagToValueDesc().size();
- keyStructInspector = (StructObjectInspector) keyObjectInspector;
- batches = new VectorizedRowBatch[maxTags];
- valueStructInspectors = new StructObjectInspector[maxTags];
- valueStringWriters = (List<VectorExpressionWriter>[]) new List[maxTags];
- keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
- buffer = new DataOutputBuffer();
+ final int maxTags = gWork.getTagToValueDesc().size();
+ keyStructInspector = (StructObjectInspector) keyObjectInspector;
+ batches = new VectorizedRowBatch[maxTags];
+ valueStructInspectors = new StructObjectInspector[maxTags];
+ valueStringWriters = (List<VectorExpressionWriter>[]) new List[maxTags];
+ keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+ buffer = new DataOutputBuffer();
}
for (int tag = 0; tag < gWork.getTagToValueDesc().size(); tag++) {
- // We should initialize the SerDe with the TypeInfo when available.
- valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag);
- inputValueDeserializer[tag] = ReflectionUtils.newInstance(
- valueTableDesc[tag].getDeserializerClass(), null);
- SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null,
- valueTableDesc[tag].getProperties(), null);
- valueObjectInspector[tag] = inputValueDeserializer[tag].getObjectInspector();
-
- ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
-
- if (vectorized) {
- /* vectorization only works with struct object inspectors */
- valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag];
-
- batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector,
- valueStructInspectors[tag]);
- final int totalColumns = keysColumnOffset
- + valueStructInspectors[tag].getAllStructFieldRefs().size();
- valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns);
- valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory
- .genVectorStructExpressionWritables(keyStructInspector)));
- valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory
- .genVectorStructExpressionWritables(valueStructInspectors[tag])));
-
- /*
- * The row object inspector used by ReduceWork needs to be a
- * **standard** struct object inspector, not just any struct object
- * inspector.
- */
- ArrayList<String> colNames = new ArrayList<String>();
- List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs();
- for (StructField field : fields) {
- colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
- ois.add(field.getFieldObjectInspector());
- }
- fields = valueStructInspectors[tag].getAllStructFieldRefs();
- for (StructField field : fields) {
- colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
- ois.add(field.getFieldObjectInspector());
- }
- rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
- colNames, ois);
- } else {
- ois.add(keyObjectInspector);
- ois.add(valueObjectInspector[tag]);
- //reducer.setGroupKeyObjectInspector(keyObjectInspector);
- rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
- Utilities.reduceFieldNameList, ois);
- }
+ // We should initialize the SerDe with the TypeInfo when available.
+ valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag);
+ inputValueDeserializer[tag] = ReflectionUtils.newInstance(
+ valueTableDesc[tag].getDeserializerClass(), null);
+ SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null,
+ valueTableDesc[tag].getProperties(), null);
+ valueObjectInspector[tag] = inputValueDeserializer[tag].getObjectInspector();
+
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+
+ if (vectorized) {
+ /* vectorization only works with struct object inspectors */
+ valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag];
+
+ batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector,
+ valueStructInspectors[tag]);
+ final int totalColumns = keysColumnOffset
+ + valueStructInspectors[tag].getAllStructFieldRefs().size();
+ valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns);
+ valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory
+ .genVectorStructExpressionWritables(keyStructInspector)));
+ valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory
+ .genVectorStructExpressionWritables(valueStructInspectors[tag])));
+
+ /*
+ * The row object inspector used by ReduceWork needs to be a
+ * **standard** struct object inspector, not just any struct object
+ * inspector.
+ */
+ ArrayList<String> colNames = new ArrayList<String>();
+ List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs();
+ for (StructField field : fields) {
+ colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
+ ois.add(field.getFieldObjectInspector());
+ }
+ fields = valueStructInspectors[tag].getAllStructFieldRefs();
+ for (StructField field : fields) {
+ colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
+ ois.add(field.getFieldObjectInspector());
+ }
+ rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
+ colNames, ois);
+ } else {
+ ois.add(keyObjectInspector);
+ ois.add(valueObjectInspector[tag]);
+ //reducer.setGroupKeyObjectInspector(keyObjectInspector);
+ rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
+ Utilities.reduceFieldNameList, ois);
+ }
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -283,19 +283,20 @@ public class SparkReduceRecordHandler ex
}
/* this.keyObject passed via reference */
if (vectorized) {
- processVectors(values, tag);
+ processVectors(values, tag);
} else {
- processKeyValues(values, tag);
+ processKeyValues(values, tag);
}
} catch (Throwable e) {
abort = true;
if (e instanceof OutOfMemoryError) {
- // Don't create a new object if we are already out of memory
- throw (OutOfMemoryError) e;
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
} else {
- LOG.fatal(StringUtils.stringifyException(e));
- throw new RuntimeException(e);
+ String msg = "Fatal error: " + e;
+ LOG.fatal(msg, e);
+ throw new RuntimeException(e);
}
}
}
@@ -305,43 +306,39 @@ public class SparkReduceRecordHandler ex
* @return true if it is not done and can take more inputs
*/
private boolean processKeyValues(Iterator values, byte tag) throws HiveException {
-
- // System.err.print(keyObject.toString());
- while (values.hasNext()) {
- BytesWritable valueWritable = (BytesWritable) values.next();
- // System.err.print(who.getHo().toString());
- try {
- valueObject[tag] = inputValueDeserializer[tag].deserialize(valueWritable);
- } catch (SerDeException e) {
- throw new HiveException(
- "Hive Runtime Error: Unable to deserialize reduce input value (tag="
- + tag
- + ") from "
- + Utilities.formatBinaryString(valueWritable.get(), 0,
- valueWritable.getSize()) + " with properties "
- + valueTableDesc[tag].getProperties(), e);
- }
- row.clear();
- row.add(keyObject);
- row.add(valueObject[tag]);
- if (isLogInfoEnabled) {
- logMemoryInfo();
- }
+ while (values.hasNext()) {
+ BytesWritable valueWritable = (BytesWritable) values.next();
+ try {
+ valueObject[tag] = inputValueDeserializer[tag].deserialize(valueWritable);
+ } catch (SerDeException e) {
+ throw new HiveException(
+ "Hive Runtime Error: Unable to deserialize reduce input value (tag="
+ + tag
+ + ") from "
+ + Utilities.formatBinaryString(valueWritable.get(), 0,
+ valueWritable.getSize()) + " with properties "
+ + valueTableDesc[tag].getProperties(), e);
+ }
+ row.clear();
+ row.add(keyObject);
+ row.add(valueObject[tag]);
+ if (isLogInfoEnabled) {
+ logMemoryInfo();
+ }
+ try {
+ reducer.processOp(row, tag);
+ } catch (Exception e) {
+ String rowString = null;
try {
- reducer.processOp(row, tag);
- } catch (Exception e) {
- String rowString = null;
- try {
- rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]);
- } catch (Exception e2) {
- rowString = "[Error getting row data with exception " +
- StringUtils.stringifyException(e2) + " ]";
- }
- throw new HiveException("Hive Runtime Error while processing row (tag="
- + tag + ") " + rowString, e);
+ rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]);
+ } catch (Exception e2) {
+ rowString = "[Error getting row data with exception " +
+ StringUtils.stringifyException(e2) + " ]";
}
+ throw new HiveException("Hive Runtime Error while processing row (tag="
+ + tag + ") " + rowString, e);
}
-
+ }
return true; // give me more
}
@@ -362,41 +359,41 @@ public class SparkReduceRecordHandler ex
int rowIdx = 0;
try {
while (values.hasNext()) {
- /* deserialize value into columns */
- BytesWritable valueWritable = (BytesWritable) values.next();
- Object valueObj = deserializeValue(valueWritable, tag);
-
- VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], rowIdx,
- keysColumnOffset, batch, buffer);
- rowIdx++;
- if (rowIdx >= BATCH_SIZE) {
- VectorizedBatchUtil.setBatchSize(batch, rowIdx);
- reducer.processOp(batch, tag);
- rowIdx = 0;
- if (isLogInfoEnabled) {
- logMemoryInfo();
- }
- }
+ /* deserialize value into columns */
+ BytesWritable valueWritable = (BytesWritable) values.next();
+ Object valueObj = deserializeValue(valueWritable, tag);
+
+ VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], rowIdx,
+ keysColumnOffset, batch, buffer);
+ rowIdx++;
+ if (rowIdx >= BATCH_SIZE) {
+ VectorizedBatchUtil.setBatchSize(batch, rowIdx);
+ reducer.processOp(batch, tag);
+ rowIdx = 0;
+ if (isLogInfoEnabled) {
+ logMemoryInfo();
+ }
+ }
}
if (rowIdx > 0) {
- VectorizedBatchUtil.setBatchSize(batch, rowIdx);
- reducer.processOp(batch, tag);
+ VectorizedBatchUtil.setBatchSize(batch, rowIdx);
+ reducer.processOp(batch, tag);
}
if (isLogInfoEnabled) {
- logMemoryInfo();
+ logMemoryInfo();
}
} catch (Exception e) {
String rowString = null;
try {
- /* batch.toString depends on this */
- batch.setValueWriters(valueStringWriters[tag].toArray(new VectorExpressionWriter[0]));
- rowString = batch.toString();
+ /* batch.toString depends on this */
+ batch.setValueWriters(valueStringWriters[tag].toArray(new VectorExpressionWriter[0]));
+ rowString = batch.toString();
} catch (Exception e2) {
- rowString = "[Error getting row data with exception " + StringUtils.stringifyException(e2)
- + " ]";
+ rowString = "[Error getting row data with exception " + StringUtils.stringifyException(e2)
+ + " ]";
}
throw new HiveException("Hive Runtime Error while processing vector batch (tag=" + tag + ") "
- + rowString, e);
+ + rowString, e);
}
return true; // give me more
}
@@ -406,9 +403,9 @@ public class SparkReduceRecordHandler ex
return inputValueDeserializer[tag].deserialize(valueWritable);
} catch (SerDeException e) {
throw new HiveException("Hive Runtime Error: Unable to deserialize reduce input value (tag="
- + tag + ") from "
- + Utilities.formatBinaryString(valueWritable.getBytes(), 0, valueWritable.getLength())
- + " with properties " + valueTableDesc[tag].getProperties(), e);
+ + tag + ") from "
+ + Utilities.formatBinaryString(valueWritable.getBytes(), 0, valueWritable.getLength())
+ + " with properties " + valueTableDesc[tag].getProperties(), e);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1649899&r1=1649898&r2=1649899&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Tue Jan 6 18:45:10 2015
@@ -313,14 +313,14 @@ public class Vectorizer implements Physi
}
}
} else if (currTask instanceof SparkTask) {
- SparkWork sparkWork = (SparkWork) currTask.getWork();
- for (BaseWork baseWork : sparkWork.getAllWork()) {
- if (baseWork instanceof MapWork) {
- convertMapWork((MapWork) baseWork, false);
- } else if (baseWork instanceof ReduceWork) {
- convertReduceWork((ReduceWork) baseWork);
- }
- }
+ SparkWork sparkWork = (SparkWork) currTask.getWork();
+ for (BaseWork baseWork : sparkWork.getAllWork()) {
+ if (baseWork instanceof MapWork) {
+ convertMapWork((MapWork) baseWork, false);
+ } else if (baseWork instanceof ReduceWork) {
+ convertReduceWork((ReduceWork) baseWork);
+ }
+ }
}
return null;
}