You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/08 06:38:26 UTC
svn commit: r1623263 [16/28] - in /hive/branches/spark: ./
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/
ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/
beeline/src/test/org/apache/hive/beeline/ bin/...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Mon Sep 8 04:38:17 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -29,12 +30,15 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -62,6 +66,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
@@ -100,6 +105,10 @@ public class FileSinkOperator extends Te
private transient List<Object> keyWritables;
private transient List<String> keys;
private transient int numKeyColToRead;
+ private StructField recIdField; // field to find record identifier in
+ private StructField bucketField; // field bucket is in in record id
+ private StructObjectInspector recIdInspector; // OI for inspecting record id
+ private IntObjectInspector bucketInspector; // OI for inspecting bucket id
/**
* RecordWriter.
@@ -117,7 +126,10 @@ public class FileSinkOperator extends Te
Path[] outPaths;
Path[] finalPaths;
RecordWriter[] outWriters;
+ RecordUpdater[] updaters;
Stat stat;
+ int acidLastBucket = -1;
+ int acidFileOffset = -1;
public FSPaths() {
}
@@ -128,6 +140,8 @@ public class FileSinkOperator extends Te
outPaths = new Path[numFiles];
finalPaths = new Path[numFiles];
outWriters = new RecordWriter[numFiles];
+ updaters = new RecordUpdater[numFiles];
+ LOG.debug("Created slots for " + numFiles);
stat = new Stat();
}
@@ -168,6 +182,15 @@ public class FileSinkOperator extends Te
}
}
}
+ try {
+ for (int i = 0; i < updaters.length; i++) {
+ if (updaters[i] != null) {
+ updaters[i].close(abort);
+ }
+ }
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
}
private void commit(FileSystem fs) throws HiveException {
@@ -177,7 +200,21 @@ public class FileSinkOperator extends Te
&& !fs.exists(finalPaths[idx].getParent())) {
fs.mkdirs(finalPaths[idx].getParent());
}
- if (!fs.rename(outPaths[idx], finalPaths[idx])) {
+ boolean needToRename = true;
+ if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+ conf.getWriteType() == AcidUtils.Operation.DELETE) {
+ // If we're updating or deleting there may be no file to close. This can happen
+ // because the where clause strained out all of the records for a given bucket. So
+ // before attempting the rename below, check if our file exists. If it doesn't,
+ // then skip the rename. If it does try it. We could just blindly try the rename
+ // and avoid the extra stat, but that would mask other errors.
+ try {
+ FileStatus stat = fs.getFileStatus(outPaths[idx]);
+ } catch (FileNotFoundException fnfe) {
+ needToRename = false;
+ }
+ }
+ if (needToRename && !fs.rename(outPaths[idx], finalPaths[idx])) {
throw new HiveException("Unable to rename output from: " +
outPaths[idx] + " to: " + finalPaths[idx]);
}
@@ -350,6 +387,16 @@ public class FileSinkOperator extends Te
valToPaths.put("", fsp); // special entry for non-DP case
}
}
+
+ if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+ conf.getWriteType() == AcidUtils.Operation.DELETE) {
+ // ROW__ID is always in the first field
+ recIdField = ((StructObjectInspector)outputObjInspector).getAllStructFieldRefs().get(0);
+ recIdInspector = (StructObjectInspector)recIdField.getFieldObjectInspector();
+ // bucket is the second field in the record id
+ bucketField = recIdInspector.getAllStructFieldRefs().get(1);
+ bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
+ }
initializeChildren(hconf);
} catch (HiveException e) {
throw e;
@@ -420,6 +467,7 @@ public class FileSinkOperator extends Te
assert totalFiles == 1;
}
+ int bucketNum = 0;
if (multiFileSpray) {
key.setHashCode(idx);
@@ -436,7 +484,7 @@ public class FileSinkOperator extends Te
}
}
- int bucketNum = prtner.getBucket(key, null, totalFiles);
+ bucketNum = prtner.getBucket(key, null, totalFiles);
if (seenBuckets.contains(bucketNum)) {
continue;
}
@@ -462,7 +510,8 @@ public class FileSinkOperator extends Te
filesCreated = true;
}
- protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException {
+ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx)
+ throws HiveException {
try {
if (isNativeTable) {
fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null);
@@ -493,11 +542,21 @@ public class FileSinkOperator extends Te
Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc);
// only create bucket files only if no dynamic partitions,
// buckets of dynamic partitions will be created for each newly created partition
- fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(),
- outputClass, conf, fsp.outPaths[filesIdx], reporter);
- // If the record writer provides stats, get it from there instead of the serde
- statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter;
- // increment the CREATED_FILES counter
+ if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+ fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(),
+ outputClass, conf, fsp.outPaths[filesIdx], reporter);
+ // If the record writer provides stats, get it from there instead of the serde
+ statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof
+ StatsProvidingRecordWriter;
+ // increment the CREATED_FILES counter
+ } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
+ // Only set up the updater for insert. For update and delete we don't know unitl we see
+ // the row.
+ ObjectInspector inspector = bDynParts ? subSetOI : outputObjInspector;
+ int acidBucketNum = Integer.valueOf(Utilities.getTaskIdFromFilename(taskId));
+ fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(),
+ acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1);
+ }
if (reporter != null) {
reporter.incrCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP),
Operator.HIVECOUNTERCREATEDFILES, 1);
@@ -598,27 +657,47 @@ public class FileSinkOperator extends Te
}
- RecordWriter rowOutWriter = null;
-
if (row_count != null) {
row_count.set(row_count.get() + 1);
}
- if (!multiFileSpray) {
- rowOutWriter = rowOutWriters[0];
+ int writerOffset = findWriterOffset(row);
+ // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same
+ // for a given operator branch prediction should work quite nicely on it.
+ // RecordUpdateer expects to get the actual row, not a serialized version of it. Thus we
+ // pass the row rather than recordValue.
+ if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+ rowOutWriters[writerOffset].write(recordValue);
+ } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
+ fpaths.updaters[writerOffset].insert(conf.getTransactionId(), row);
} else {
- int keyHashCode = 0;
- for (int i = 0; i < partitionEval.length; i++) {
- Object o = partitionEval[i].evaluate(row);
- keyHashCode = keyHashCode * 31
- + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
- }
- key.setHashCode(keyHashCode);
- int bucketNum = prtner.getBucket(key, null, totalFiles);
- int idx = bucketMap.get(bucketNum);
- rowOutWriter = rowOutWriters[idx];
+ // TODO I suspect we could skip much of the stuff above this in the function in the case
+ // of update and delete. But I don't understand all of the side effects of the above
+ // code and don't want to skip over it yet.
+
+ // Find the bucket id, and switch buckets if need to
+ ObjectInspector rowInspector = bDynParts ? subSetOI : outputObjInspector;
+ Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField);
+ int bucketNum =
+ bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField));
+ if (fpaths.acidLastBucket != bucketNum) {
+ fpaths.acidLastBucket = bucketNum;
+ // Switch files
+ fpaths.updaters[++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
+ jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[fpaths.acidFileOffset],
+ rowInspector, reporter, 0);
+ LOG.debug("Created updater for bucket number " + bucketNum + " using file " +
+ fpaths.outPaths[fpaths.acidFileOffset]);
+ }
+
+ if (conf.getWriteType() == AcidUtils.Operation.UPDATE) {
+ fpaths.updaters[fpaths.acidFileOffset].update(conf.getTransactionId(), row);
+ } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) {
+ fpaths.updaters[fpaths.acidFileOffset].delete(conf.getTransactionId(), row);
+ } else {
+ throw new HiveException("Unknown write type " + conf.getWriteType().toString());
+ }
}
- rowOutWriter.write(recordValue);
} catch (IOException e) {
throw new HiveException(e);
} catch (SerDeException e) {
@@ -627,6 +706,11 @@ public class FileSinkOperator extends Te
}
protected boolean areAllTrue(boolean[] statsFromRW) {
+ // If we are doing an acid operation they will always all be true as RecordUpdaters always
+ // collect stats
+ if (conf.getWriteType() != AcidUtils.Operation.NOT_ACID) {
+ return true;
+ }
for(boolean b : statsFromRW) {
if (!b) {
return false;
@@ -635,6 +719,23 @@ public class FileSinkOperator extends Te
return true;
}
+ private int findWriterOffset(Object row) throws HiveException {
+ if (!multiFileSpray) {
+ return 0;
+ } else {
+ int keyHashCode = 0;
+ for (int i = 0; i < partitionEval.length; i++) {
+ Object o = partitionEval[i].evaluate(row);
+ keyHashCode = keyHashCode * 31
+ + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+ }
+ key.setHashCode(keyHashCode);
+ int bucketNum = prtner.getBucket(key, null, totalFiles);
+ return bucketMap.get(bucketNum);
+ }
+
+ }
+
/**
* Lookup list bucketing path.
* @param lbDirName
@@ -727,14 +828,16 @@ public class FileSinkOperator extends Te
FSPaths fp;
// get the path corresponding to the dynamic partition columns,
- String dpDir = getDynPartDirectory(row, dpColNames, numDynParts);
+ String dpDir = getDynPartDirectory(row, dpColNames);
String pathKey = null;
if (dpDir != null) {
dpDir = appendToSource(lbDirName, dpDir);
pathKey = dpDir;
+ int numericBucketNum = 0;
if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
String buckNum = row.get(row.size() - 1);
+ numericBucketNum = Integer.valueOf(buckNum);
taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), buckNum);
pathKey = appendToSource(taskId, dpDir);
}
@@ -756,13 +859,18 @@ public class FileSinkOperator extends Te
// since we are closing the previous fsp's record writers, we need to see if we can get
// stats from the record writer and store in the previous fsp that is cached
if (conf.isGatherStats() && isCollectRWStats) {
- RecordWriter outWriter = prevFsp.outWriters[0];
- if (outWriter != null) {
- SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
- if (stats != null) {
+ SerDeStats stats = null;
+ if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+ RecordWriter outWriter = prevFsp.outWriters[0];
+ if (outWriter != null) {
+ stats = ((StatsProvidingRecordWriter) outWriter).getStats();
+ }
+ } else if (prevFsp.updaters[0] != null) {
+ stats = prevFsp.updaters[0].getStats();
+ }
+ if (stats != null) {
prevFsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
prevFsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
- }
}
}
@@ -805,8 +913,7 @@ public class FileSinkOperator extends Te
// given the current input row, the mapping for input col info to dp columns, and # of dp cols,
// return the relative path corresponding to the row.
// e.g., ds=2008-04-08/hr=11
- private String getDynPartDirectory(List<String> row, List<String> dpColNames, int numDynParts) {
- assert row.size() == numDynParts && numDynParts == dpColNames.size() : "data length is different from num of DP columns";
+ private String getDynPartDirectory(List<String> row, List<String> dpColNames) {
return FileUtils.makePartName(dpColNames, row);
}
@@ -832,6 +939,7 @@ public class FileSinkOperator extends Te
@Override
public void closeOp(boolean abort) throws HiveException {
+
if (!bDynParts && !filesCreated) {
createBucketFiles(fsp);
}
@@ -849,13 +957,25 @@ public class FileSinkOperator extends Te
// record writer already gathers the statistics, it can simply return the
// accumulated statistics which will be aggregated in case of spray writers
if (conf.isGatherStats() && isCollectRWStats) {
- for (int idx = 0; idx < fsp.outWriters.length; idx++) {
- RecordWriter outWriter = fsp.outWriters[idx];
- if (outWriter != null) {
- SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
- if (stats != null) {
- fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
- fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+ if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+ for (int idx = 0; idx < fsp.outWriters.length; idx++) {
+ RecordWriter outWriter = fsp.outWriters[idx];
+ if (outWriter != null) {
+ SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
+ if (stats != null) {
+ fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+ fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+ }
+ }
+ }
+ } else {
+ for (int i = 0; i < fsp.updaters.length; i++) {
+ if (fsp.updaters[i] != null) {
+ SerDeStats stats = fsp.updaters[i].getStats();
+ if (stats != null) {
+ fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+ fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+ }
}
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Mon Sep 8 04:38:17 2014
@@ -39,7 +39,6 @@ import javax.xml.parsers.DocumentBuilder
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Function;
@@ -562,7 +561,7 @@ public final class FunctionRegistry {
return null;
}
- Class<?> udfClass = Class.forName(func.getClassName(), true, JavaUtils.getClassLoader());
+ Class<?> udfClass = Class.forName(func.getClassName(), true, Utilities.getSessionSpecifiedClassLoader());
if (registerTemporaryFunction(functionName, udfClass)) {
ret = mFunctions.get(functionName);
} else {
@@ -610,7 +609,7 @@ public final class FunctionRegistry {
// Even if we have a reference to the class (which will be the case for GenericUDFs),
// the classloader may not be able to resolve the class, which would mean reflection-based
// methods would fail such as for plan deserialization. Make sure this works too.
- Class.forName(udfClass.getName(), true, JavaUtils.getClassLoader());
+ Class.forName(udfClass.getName(), true, Utilities.getSessionSpecifiedClassLoader());
}
private static void loadFunctionResourcesIfNecessary(String functionName, CommonFunctionInfo cfi) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java Mon Sep 8 04:38:17 2014
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.exec;
import static org.apache.hadoop.util.StringUtils.stringifyException;
import java.io.IOException;
-import java.net.URI;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -33,10 +32,8 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.ResourceType;
import org.apache.hadoop.hive.metastore.api.ResourceUri;
-import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.FunctionUtils.FunctionType;
import org.apache.hadoop.hive.ql.exec.FunctionUtils.UDFClassType;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -47,10 +44,6 @@ import org.apache.hadoop.hive.ql.plan.Dr
import org.apache.hadoop.hive.ql.plan.FunctionWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
/**
@@ -308,9 +301,10 @@ public class FunctionTask extends Task<F
}
}
- @SuppressWarnings("unchecked")
private Class<?> getUdfClass(CreateFunctionDesc desc) throws ClassNotFoundException {
- return Class.forName(desc.getClassName(), true, JavaUtils.getClassLoader());
+ // get the session specified class loader from SessionState
+ ClassLoader classLoader = Utilities.getSessionSpecifiedClassLoader();
+ return Class.forName(desc.getClassName(), true, classLoader);
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java Mon Sep 8 04:38:17 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.lockmgr
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
/**
* Class to handle heartbeats for MR and Tez tasks.
@@ -64,7 +65,8 @@ public class Heartbeater {
if (heartbeatInterval == 0) {
// Multiply the heartbeat interval by 1000 to convert to milliseconds,
// but divide by 2 to give us a safety factor.
- heartbeatInterval = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT) * 500;
+ heartbeatInterval = HiveConf.getTimeVar(
+ conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2;
if (heartbeatInterval == 0) {
LOG.warn(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set, heartbeats won't be sent");
dontHeartbeat = true;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java Mon Sep 8 04:38:17 2014
@@ -20,24 +20,50 @@
package org.apache.hadoop.hive.ql.exec;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
-public class HiveTotalOrderPartitioner implements Partitioner<HiveKey, Object> {
+public class HiveTotalOrderPartitioner implements Partitioner<HiveKey, Object>, Configurable {
- private Partitioner<BytesWritable, Object> partitioner
- = new TotalOrderPartitioner<BytesWritable, Object>();
+ private static final Log LOG = LogFactory.getLog(HiveTotalOrderPartitioner.class);
+ private Partitioner<BytesWritable, Object> partitioner;
+
+ @Override
public void configure(JobConf job) {
- JobConf newconf = new JobConf(job);
- newconf.setMapOutputKeyClass(BytesWritable.class);
- partitioner.configure(newconf);
+ if (partitioner == null) {
+ configurePartitioner(new JobConf(job));
+ }
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ // walk-around of TEZ-1403
+ if (partitioner == null) {
+ configurePartitioner(new JobConf(conf));
+ }
}
public int getPartition(HiveKey key, Object value, int numPartitions) {
return partitioner.getPartition(key, value, numPartitions);
}
+
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
+
+ private void configurePartitioner(JobConf conf) {
+ LOG.info(TotalOrderPartitioner.getPartitionFile(conf));
+ conf.setMapOutputKeyClass(BytesWritable.class);
+ partitioner = new TotalOrderPartitioner<BytesWritable, Object>();
+ partitioner.configure(conf);
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java Mon Sep 8 04:38:17 2014
@@ -57,7 +57,7 @@ public class ListSinkOperator extends Op
FetchFormatter fetcher;
if (formatterName != null && !formatterName.isEmpty()) {
Class<? extends FetchFormatter> fetcherClass = Class.forName(formatterName, true,
- JavaUtils.getClassLoader()).asSubclass(FetchFormatter.class);
+ Utilities.getSessionSpecifiedClassLoader()).asSubclass(FetchFormatter.class);
fetcher = ReflectionUtils.newInstance(fetcherClass, null);
} else {
fetcher = new DefaultFetchFormatter();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Mon Sep 8 04:38:17 2014
@@ -104,6 +104,8 @@ public class MapJoinOperator extends Abs
cache = ObjectCacheFactory.getCache(hconf);
loader = HashTableLoaderFactory.getLoader(hconf);
+ hashMapRowGetters = null;
+
mapJoinTables = (MapJoinTableContainer[]) cache.retrieve(tableKey);
mapJoinTableSerdes = (MapJoinTableContainerSerDe[]) cache.retrieve(serdeKey);
hashTblInitedOnce = true;
@@ -186,7 +188,7 @@ public class MapJoinOperator extends Abs
* process different buckets and if the container is reused to join a different bucket,
* join results can be incorrect. The cache is keyed on operator id and for bucket map join
* the operator does not change but data needed is different. For a proper fix, this
- * requires changes in the Tez API with regard to finding bucket id and
+ * requires changes in the Tez API with regard to finding bucket id and
* also ability to schedule tasks to re-use containers that have cached the specific bucket.
*/
LOG.info("This is not bucket map join, so cache");
@@ -237,7 +239,7 @@ public class MapJoinOperator extends Abs
firstRow = false;
}
- alias = (byte)tag;
+ alias = (byte) tag;
if (hashMapRowGetters == null) {
hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length];
MapJoinKey refKey = getRefKey(alias);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Mon Sep 8 04:38:17 2014
@@ -348,17 +348,17 @@ public class MoveTask extends Task<MoveW
// want to isolate any potential issue it may introduce.
ArrayList<LinkedHashMap<String, String>> dp =
db.loadDynamicPartitions(
- tbd.getSourcePath(),
- tbd.getTable().getTableName(),
- tbd.getPartitionSpec(),
- tbd.getReplace(),
- dpCtx.getNumDPCols(),
- tbd.getHoldDDLTime(),
- isSkewedStoredAsDirs(tbd));
+ tbd.getSourcePath(),
+ tbd.getTable().getTableName(),
+ tbd.getPartitionSpec(),
+ tbd.getReplace(),
+ dpCtx.getNumDPCols(),
+ tbd.getHoldDDLTime(),
+ isSkewedStoredAsDirs(tbd));
if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
throw new HiveException("This query creates no partitions." +
- " To turn off this error, set hive.error.on.empty.partition=false.");
+ " To turn off this error, set hive.error.on.empty.partition=false.");
}
// for each partition spec, get the partition
@@ -412,13 +412,13 @@ public class MoveTask extends Task<MoveW
numBuckets, sortCols);
}
- dc = new DataContainer(table.getTTable(), partn.getTPartition());
- // add this partition to post-execution hook
- if (work.getOutputs() != null) {
- work.getOutputs().add(new WriteEntity(partn,
+ dc = new DataContainer(table.getTTable(), partn.getTPartition());
+ // add this partition to post-execution hook
+ if (work.getOutputs() != null) {
+ work.getOutputs().add(new WriteEntity(partn,
(tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE
: WriteEntity.WriteType.INSERT)));
- }
+ }
}
}
if (SessionState.get() != null && dc != null) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Mon Sep 8 04:38:17 2014
@@ -29,13 +29,15 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
import org.apache.hadoop.hive.ql.plan.DemuxDesc;
import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
@@ -64,6 +66,7 @@ import org.apache.hadoop.hive.ql.plan.Un
* OperatorFactory.
*
*/
+@SuppressWarnings({ "rawtypes", "unchecked" })
public final class OperatorFactory {
private static final List<OpTuple> opvec;
private static final List<OpTuple> vectorOpvec;
@@ -101,6 +104,10 @@ public final class OperatorFactory {
DemuxOperator.class));
opvec.add(new OpTuple<MuxDesc>(MuxDesc.class,
MuxOperator.class));
+ opvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class,
+ AppMasterEventOperator.class));
+ opvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
+ AppMasterEventOperator.class));
}
static {
@@ -119,9 +126,9 @@ public final class OperatorFactory {
private static final class OpTuple<T extends OperatorDesc> {
private final Class<T> descClass;
- private final Class<? extends Operator<T>> opClass;
+ private final Class<? extends Operator<?>> opClass;
- public OpTuple(Class<T> descClass, Class<? extends Operator<T>> opClass) {
+ public OpTuple(Class<T> descClass, Class<? extends Operator<?>> opClass) {
this.descClass = descClass;
this.opClass = opClass;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Mon Sep 8 04:38:17 2014
@@ -45,61 +45,60 @@ import org.apache.hadoop.hive.serde2.obj
public class PTFOperator extends Operator<PTFDesc> implements Serializable {
- private static final long serialVersionUID = 1L;
- boolean isMapOperator;
+ private static final long serialVersionUID = 1L;
+ boolean isMapOperator;
- transient KeyWrapperFactory keyWrapperFactory;
- protected transient KeyWrapper currentKeys;
- protected transient KeyWrapper newKeys;
- /*
- * for map-side invocation of PTFs, we cannot utilize the currentkeys null check
- * to decide on invoking startPartition in streaming mode. Hence this extra flag.
- */
- transient boolean firstMapRow;
- transient Configuration hiveConf;
- transient PTFInvocation ptfInvocation;
-
- /*
- * 1. Find out if the operator is invoked at Map-Side or Reduce-side
- * 2. Get the deserialized QueryDef
- * 3. Reconstruct the transient variables in QueryDef
- * 4. Create input partition to store rows coming from previous operator
- */
- @Override
- protected void initializeOp(Configuration jobConf) throws HiveException {
- hiveConf = jobConf;
- // if the parent is ExtractOperator, this invocation is from reduce-side
- isMapOperator = conf.isMapSide();
-
- reconstructQueryDef(hiveConf);
-
- if (isMapOperator) {
- PartitionedTableFunctionDef tDef = conf.getStartOfChain();
- outputObjInspector = tDef.getRawInputShape().getOI();
- } else {
- outputObjInspector = conf.getFuncDef().getOutputShape().getOI();
- }
-
- setupKeysWrapper(inputObjInspectors[0]);
-
- ptfInvocation = setupChain();
- ptfInvocation.initializeStreaming(jobConf, isMapOperator);
- firstMapRow = true;
-
- super.initializeOp(jobConf);
- }
-
- @Override
- protected void closeOp(boolean abort) throws HiveException {
- super.closeOp(abort);
+ transient KeyWrapperFactory keyWrapperFactory;
+ protected transient KeyWrapper currentKeys;
+ protected transient KeyWrapper newKeys;
+ /*
+ * for map-side invocation of PTFs, we cannot utilize the currentkeys null check
+ * to decide on invoking startPartition in streaming mode. Hence this extra flag.
+ */
+ transient boolean firstMapRow;
+ transient Configuration hiveConf;
+ transient PTFInvocation ptfInvocation;
+
+ /*
+ * 1. Find out if the operator is invoked at Map-Side or Reduce-side
+ * 2. Get the deserialized QueryDef
+ * 3. Reconstruct the transient variables in QueryDef
+ * 4. Create input partition to store rows coming from previous operator
+ */
+ @Override
+ protected void initializeOp(Configuration jobConf) throws HiveException {
+ hiveConf = jobConf;
+ // if the parent is ExtractOperator, this invocation is from reduce-side
+ isMapOperator = conf.isMapSide();
+
+ reconstructQueryDef(hiveConf);
+
+ if (isMapOperator) {
+ PartitionedTableFunctionDef tDef = conf.getStartOfChain();
+ outputObjInspector = tDef.getRawInputShape().getOI();
+ } else {
+ outputObjInspector = conf.getFuncDef().getOutputShape().getOI();
+ }
+
+ setupKeysWrapper(inputObjInspectors[0]);
+
+ ptfInvocation = setupChain();
+ ptfInvocation.initializeStreaming(jobConf, isMapOperator);
+ firstMapRow = true;
+
+ super.initializeOp(jobConf);
+ }
+
+ @Override
+ protected void closeOp(boolean abort) throws HiveException {
+ super.closeOp(abort);
ptfInvocation.finishPartition();
ptfInvocation.close();
}
- @Override
- public void processOp(Object row, int tag) throws HiveException
- {
- if (!isMapOperator ) {
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ if (!isMapOperator ) {
/*
* checkif current row belongs to the current accumulated Partition:
* - If not:
@@ -129,51 +128,51 @@ public class PTFOperator extends Operato
}
ptfInvocation.processRow(row);
- }
+ }
+
+ /**
+ * Initialize the visitor to use the QueryDefDeserializer Use the order
+ * defined in QueryDefWalker to visit the QueryDef
+ *
+ * @param hiveConf
+ * @throws HiveException
+ */
+ protected void reconstructQueryDef(Configuration hiveConf) throws HiveException {
+
+ PTFDeserializer dS =
+ new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf);
+ dS.initializePTFChain(conf.getFuncDef());
+ }
+
+ protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException {
+ PartitionDef pDef = conf.getStartOfChain().getPartition();
+ List<PTFExpressionDef> exprs = pDef.getExpressions();
+ int numExprs = exprs.size();
+ ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numExprs];
+ ObjectInspector[] keyOIs = new ObjectInspector[numExprs];
+ ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs];
+
+ for(int i=0; i<numExprs; i++) {
+ PTFExpressionDef exprDef = exprs.get(i);
+ /*
+ * Why cannot we just use the ExprNodeEvaluator on the column?
+ * - because on the reduce-side it is initialized based on the rowOI of the HiveTable
+ * and not the OI of the ExtractOp ( the parent of this Operator on the reduce-side)
+ */
+ keyFields[i] = ExprNodeEvaluatorFactory.get(exprDef.getExprNode());
+ keyOIs[i] = keyFields[i].initialize(inputOI);
+ currentKeyOIs[i] =
+ ObjectInspectorUtils.getStandardObjectInspector(keyOIs[i],
+ ObjectInspectorCopyOption.WRITABLE);
+ }
+
+ keyWrapperFactory = new KeyWrapperFactory(keyFields, keyOIs, currentKeyOIs);
+ newKeys = keyWrapperFactory.getKeyWrapper();
+ }
- /**
- * Initialize the visitor to use the QueryDefDeserializer Use the order
- * defined in QueryDefWalker to visit the QueryDef
- *
- * @param hiveConf
- * @throws HiveException
- */
- protected void reconstructQueryDef(Configuration hiveConf) throws HiveException {
-
- PTFDeserializer dS =
- new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf);
- dS.initializePTFChain(conf.getFuncDef());
- }
-
- protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException {
- PartitionDef pDef = conf.getStartOfChain().getPartition();
- List<PTFExpressionDef> exprs = pDef.getExpressions();
- int numExprs = exprs.size();
- ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numExprs];
- ObjectInspector[] keyOIs = new ObjectInspector[numExprs];
- ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs];
-
- for(int i=0; i<numExprs; i++) {
- PTFExpressionDef exprDef = exprs.get(i);
- /*
- * Why cannot we just use the ExprNodeEvaluator on the column?
- * - because on the reduce-side it is initialized based on the rowOI of the HiveTable
- * and not the OI of the ExtractOp ( the parent of this Operator on the reduce-side)
- */
- keyFields[i] = ExprNodeEvaluatorFactory.get(exprDef.getExprNode());
- keyOIs[i] = keyFields[i].initialize(inputOI);
- currentKeyOIs[i] =
- ObjectInspectorUtils.getStandardObjectInspector(keyOIs[i],
- ObjectInspectorCopyOption.WRITABLE);
- }
-
- keyWrapperFactory = new KeyWrapperFactory(keyFields, keyOIs, currentKeyOIs);
- newKeys = keyWrapperFactory.getKeyWrapper();
- }
-
- /**
- * @return the name of the operator
- */
+ /**
+ * @return the name of the operator
+ */
@Override
public String getName() {
return getOperatorName();
@@ -184,11 +183,11 @@ public class PTFOperator extends Operato
}
- @Override
- public OperatorType getType() {
- return OperatorType.PTF;
- }
-
+ @Override
+ public OperatorType getType() {
+ return OperatorType.PTF;
+ }
+
private PTFInvocation setupChain() {
Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>();
PTFInputDef iDef = conf.getFuncDef();
@@ -197,9 +196,9 @@ public class PTFOperator extends Operato
fnDefs.push((PartitionedTableFunctionDef) iDef);
iDef = ((PartitionedTableFunctionDef) iDef).getInput();
}
-
+
PTFInvocation curr = null, first = null;
-
+
while(!fnDefs.isEmpty()) {
PartitionedTableFunctionDef currFn = fnDefs.pop();
curr = new PTFInvocation(curr, currFn.getTFunction());
@@ -222,26 +221,26 @@ public class PTFOperator extends Operato
llFn.setpItr(pItr);
}
}
-
+
/*
* Responsible for the flow of rows through the PTF Chain.
- * An Invocation wraps a TableFunction.
- * The PTFOp hands the chain each row through the processRow call.
+ * An Invocation wraps a TableFunction.
+ * The PTFOp hands the chain each row through the processRow call.
* It also notifies the chain of when a Partition starts/finishes.
- *
+ *
* There are several combinations depending
* whether the TableFunction and its successor support Streaming or Batch mode.
- *
+ *
* Combination 1: Streaming + Streaming
* - Start Partition: invoke startPartition on tabFn.
- * - Process Row: invoke process Row on tabFn.
+ * - Process Row: invoke process Row on tabFn.
* Any output rows hand to next tabFn in chain or forward to next Operator.
* - Finish Partition: invoke finishPartition on tabFn.
* Any output rows hand to next tabFn in chain or forward to next Operator.
- *
+ *
* Combination 2: Streaming + Batch
* same as Combination 1
- *
+ *
* Combination 3: Batch + Batch
* - Start Partition: create or reset the Input Partition for the tabFn
* caveat is: if prev is also batch and it is not providing an Output Iterator
@@ -251,22 +250,22 @@ public class PTFOperator extends Operato
* If function gives an Output Partition: set it on next Invocation's Input Partition
* If function gives an Output Iterator: iterate and call processRow on next Invocation.
* For last Invocation in chain: forward rows to next Operator.
- *
+ *
* Combination 3: Batch + Stream
* Similar to Combination 3, except Finish Partition behavior slightly different
* - Finish Partition : invoke evaluate on tabFn on Input Partition
* iterate output rows: hand to next tabFn in chain or forward to next Operator.
- *
+ *
*/
class PTFInvocation {
-
+
PTFInvocation prev;
PTFInvocation next;
TableFunctionEvaluator tabFn;
PTFPartition inputPart;
PTFPartition outputPart;
Iterator<Object> outputPartRowsItr;
-
+
public PTFInvocation(PTFInvocation prev, TableFunctionEvaluator tabFn) {
this.prev = prev;
this.tabFn = tabFn;
@@ -274,19 +273,19 @@ public class PTFOperator extends Operato
prev.next = this;
}
}
-
+
boolean isOutputIterator() {
return tabFn.canAcceptInputAsStream() || tabFn.canIterateOutput();
}
-
+
boolean isStreaming() {
return tabFn.canAcceptInputAsStream();
}
-
+
void initializeStreaming(Configuration cfg, boolean isMapSide) throws HiveException {
PartitionedTableFunctionDef tabDef = tabFn.getTableDef();
PTFInputDef inputDef = tabDef.getInput();
- ObjectInspector inputOI = conf.getStartOfChain() == tabDef ?
+ ObjectInspector inputOI = conf.getStartOfChain() == tabDef ?
inputObjInspectors[0] : inputDef.getOutputShape().getOI();
tabFn.initializeStreaming(cfg, (StructObjectInspector) inputOI, isMapSide);
@@ -295,7 +294,7 @@ public class PTFOperator extends Operato
next.initializeStreaming(cfg, isMapSide);
}
}
-
+
void startPartition() throws HiveException {
if ( isStreaming() ) {
tabFn.startPartition();
@@ -312,7 +311,7 @@ public class PTFOperator extends Operato
next.startPartition();
}
}
-
+
void processRow(Object row) throws HiveException {
if ( isStreaming() ) {
handleOutputRows(tabFn.processRow(row));
@@ -320,7 +319,7 @@ public class PTFOperator extends Operato
inputPart.append(row);
}
}
-
+
void handleOutputRows(List<Object> outRows) throws HiveException {
if ( outRows != null ) {
for (Object orow : outRows ) {
@@ -332,7 +331,7 @@ public class PTFOperator extends Operato
}
}
}
-
+
void finishPartition() throws HiveException {
if ( isStreaming() ) {
handleOutputRows(tabFn.finishPartition());
@@ -353,7 +352,7 @@ public class PTFOperator extends Operato
}
}
}
-
+
if ( next != null ) {
next.finishPartition();
} else {
@@ -364,7 +363,7 @@ public class PTFOperator extends Operato
}
}
}
-
+
/**
* Create a new Partition.
* A partition has 2 OIs: the OI for the rows being put in and the OI for the rows
@@ -388,7 +387,7 @@ public class PTFOperator extends Operato
private void createInputPartition() throws HiveException {
PartitionedTableFunctionDef tabDef = tabFn.getTableDef();
PTFInputDef inputDef = tabDef.getInput();
- ObjectInspector inputOI = conf.getStartOfChain() == tabDef ?
+ ObjectInspector inputOI = conf.getStartOfChain() == tabDef ?
inputObjInspectors[0] : inputDef.getOutputShape().getOI();
SerDe serde = conf.isMapSide() ? tabDef.getInput().getOutputShape().getSerde() :
@@ -400,7 +399,7 @@ public class PTFOperator extends Operato
(StructObjectInspector) inputOI,
outputOI);
}
-
+
void close() {
if ( inputPart != null ) {
inputPart.close();
@@ -411,5 +410,5 @@ public class PTFOperator extends Operato
}
}
}
-
+
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java Mon Sep 8 04:38:17 2014
@@ -27,6 +27,8 @@ import java.util.Comparator;
import java.util.List;
import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,6 +47,8 @@ import org.apache.hadoop.mapred.OutputCo
public class PartitionKeySampler implements OutputCollector<HiveKey, Object> {
+ private static final Log LOG = LogFactory.getLog(PartitionKeySampler.class);
+
public static final Comparator<byte[]> C = new Comparator<byte[]>() {
public final int compare(byte[] o1, byte[] o2) {
return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length);
@@ -74,32 +78,46 @@ public class PartitionKeySampler impleme
}
// sort and pick partition keys
- // copied from org.apache.hadoop.mapred.lib.InputSampler
+ // originally copied from org.apache.hadoop.mapred.lib.InputSampler but seemed to have a bug
private byte[][] getPartitionKeys(int numReduce) {
if (sampled.size() < numReduce - 1) {
throw new IllegalStateException("not enough number of sample");
}
byte[][] sorted = sampled.toArray(new byte[sampled.size()][]);
Arrays.sort(sorted, C);
- byte[][] partitionKeys = new byte[numReduce - 1][];
- float stepSize = sorted.length / (float) numReduce;
- int last = -1;
- for(int i = 1; i < numReduce; ++i) {
- int k = Math.round(stepSize * i);
- while (last >= k && C.compare(sorted[last], sorted[k]) == 0) {
- k++;
+
+ return toPartitionKeys(sorted, numReduce);
+ }
+
+ static final byte[][] toPartitionKeys(byte[][] sorted, int numPartition) {
+ byte[][] partitionKeys = new byte[numPartition - 1][];
+
+ int last = 0;
+ int current = 0;
+ for(int i = 0; i < numPartition - 1; i++) {
+ current += Math.round((float)(sorted.length - current) / (numPartition - i));
+ while (i > 0 && current < sorted.length && C.compare(sorted[last], sorted[current]) == 0) {
+ current++;
+ }
+ if (current >= sorted.length) {
+ return Arrays.copyOfRange(partitionKeys, 0, i);
}
- if (k >= sorted.length) {
- throw new IllegalStateException("not enough number of sample");
+ if (LOG.isDebugEnabled()) {
+ // print out nth partition key for debugging
+ LOG.debug("Partition key " + current + "th :" + new BytesWritable(sorted[current]));
}
- partitionKeys[i - 1] = sorted[k];
- last = k;
+ partitionKeys[i] = sorted[current];
+ last = current;
}
return partitionKeys;
}
- public void writePartitionKeys(Path path, JobConf job) throws IOException {
+ public void writePartitionKeys(Path path, HiveConf conf, JobConf job) throws IOException {
byte[][] partitionKeys = getPartitionKeys(job.getNumReduceTasks());
+ int numPartition = partitionKeys.length + 1;
+ if (numPartition != job.getNumReduceTasks()) {
+ job.setNumReduceTasks(numPartition);
+ }
FileSystem fs = path.getFileSystem(job);
SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, path,
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java Mon Sep 8 04:38:17 2014
@@ -27,14 +27,13 @@ import java.lang.annotation.Target;
import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction;
@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.TYPE})
+@Target(ElementType.TYPE)
@Documented
-public @interface PartitionTableFunctionDescription
-{
- Description description ();
+public @interface PartitionTableFunctionDescription {
+ Description description ();
- /**
- * if true it is not usable in the language. {@link WindowingTableFunction} is the only internal function.
- */
- boolean isInternal() default false;
+ /**
+ * if true it is not usable in the language. {@link WindowingTableFunction} is the only internal function.
+ */
+ boolean isInternal() default false;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Sep 8 04:38:17 2014
@@ -31,6 +31,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -158,12 +159,12 @@ public class ScriptOperator extends Oper
}
/**
- * Maps a relative pathname to an absolute pathname using the PATH enviroment.
+ * Maps a relative pathname to an absolute pathname using the PATH environment.
*/
public class PathFinder {
String pathenv; // a string of pathnames
- String pathSep; // the path seperator
- String fileSep; // the file seperator in a directory
+ String pathSep; // the path separator
+ String fileSep; // the file separator in a directory
/**
* Construct a PathFinder object using the path from the specified system
@@ -286,7 +287,7 @@ public class ScriptOperator extends Oper
@Override
public void processOp(Object row, int tag) throws HiveException {
- // initialize the user's process only when you recieve the first row
+ // initialize the user's process only when you receive the first row
if (firstRow) {
firstRow = false;
try {
@@ -365,7 +366,8 @@ public class ScriptOperator extends Oper
.getBoolVar(hconf, HiveConf.ConfVars.HIVESCRIPTAUTOPROGRESS)) {
autoProgressor = new AutoProgressor(this.getClass().getName(),
reporter, Utilities.getDefaultNotificationInterval(hconf),
- HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT) * 1000);
+ HiveConf.getTimeVar(
+ hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS));
autoProgressor.go();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java Mon Sep 8 04:38:17 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,7 +32,6 @@ import org.apache.hadoop.hive.ql.plan.UD
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.ql.udf.generic.UDTFCollector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -86,7 +86,8 @@ public class UDTFOperator extends Operat
if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEUDTFAUTOPROGRESS)) {
autoProgressor = new AutoProgressor(this.getClass().getName(), reporter,
Utilities.getDefaultNotificationInterval(hconf),
- HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT) * 1000);
+ HiveConf.getTimeVar(
+ hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS));
autoProgressor.go();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Sep 8 04:38:17 2014
@@ -92,7 +92,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.HiveInterruptCallback;
import org.apache.hadoop.hive.common.HiveInterruptUtils;
import org.apache.hadoop.hive.common.HiveStatsUtils;
@@ -329,7 +328,9 @@ public final class Utilities {
if (!gWorkMap.containsKey(path) ||
HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
Path localPath;
- if (ShimLoader.getHadoopShims().isLocalMode(conf)) {
+ if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) {
+ localPath = new Path(name);
+ } else if (ShimLoader.getHadoopShims().isLocalMode(conf)) {
localPath = path;
} else {
LOG.info("***************non-local mode***************");
@@ -827,10 +828,12 @@ public final class Utilities {
}
}
- public static Set<Operator<?>> cloneOperatorTree(Configuration conf, Set<Operator<?>> roots) {
+ public static List<Operator<?>> cloneOperatorTree(Configuration conf, List<Operator<?>> roots) {
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
serializePlan(roots, baos, conf, true);
- Set<Operator<?>> result = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+ @SuppressWarnings("unchecked")
+ List<Operator<?>> result =
+ deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
roots.getClass(), conf, true);
return result;
}
@@ -1371,8 +1374,8 @@ public final class Utilities {
codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
}
- return (SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec,
- progressable));
+ return SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec,
+ progressable);
}
@@ -1980,6 +1983,26 @@ public final class Utilities {
}
/**
+ * get session specified class loader and get current class loader if fall
+ *
+ * @return
+ */
+ public static ClassLoader getSessionSpecifiedClassLoader() {
+ SessionState state = SessionState.get();
+ if (state == null || state.getConf() == null) {
+ LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead");
+ return JavaUtils.getClassLoader();
+ }
+ ClassLoader sessionCL = state.getConf().getClassLoader();
+ if (sessionCL != null){
+ LOG.debug("Use session specified class loader");
+ return sessionCL;
+ }
+ LOG.debug("Session specified class loader not found, use thread based class loader");
+ return JavaUtils.getClassLoader();
+ }
+
+ /**
* Create a URL from a string representing a path to a local file.
* The path string can be just a path, or can start with file:/, file:///
* @param onestr path string
@@ -1999,6 +2022,33 @@ public final class Utilities {
return oneurl;
}
+ /**
+ * get the jar files from specified directory or get jar files by several jar names sperated by comma
+ * @param path
+ * @return
+ */
+ public static Set<String> getJarFilesByPath(String path){
+ Set<String> result = new HashSet<String>();
+ if (path == null || path.isEmpty()) {
+ return result;
+ }
+
+ File paths = new File(path);
+ if (paths.exists() && paths.isDirectory()) {
+ // add all jar files under the reloadable auxiliary jar paths
+ Set<File> jarFiles = new HashSet<File>();
+ jarFiles.addAll(org.apache.commons.io.FileUtils.listFiles(
+ paths, new String[]{"jar"}, true));
+ for (File f : jarFiles) {
+ result.add(f.getAbsolutePath());
+ }
+ } else {
+ String[] files = path.split(",");
+ Collections.addAll(result, files);
+ }
+ return result;
+ }
+
/**
* Add new elements to the classpath.
*
@@ -2771,7 +2821,7 @@ public final class Utilities {
* first time it is caught, or SQLTransientException when the maxRetries has reached.
*/
public static <T> T executeWithRetry(SQLCommand<T> cmd, PreparedStatement stmt,
- int baseWindow, int maxRetries) throws SQLException {
+ long baseWindow, int maxRetries) throws SQLException {
Random r = new Random();
T result = null;
@@ -2813,7 +2863,7 @@ public final class Utilities {
* first time it is caught, or SQLTransientException when the maxRetries has reached.
*/
public static Connection connectWithRetry(String connectionString,
- int waitWindow, int maxRetries) throws SQLException {
+ long waitWindow, int maxRetries) throws SQLException {
Random r = new Random();
@@ -2855,7 +2905,7 @@ public final class Utilities {
* first time it is caught, or SQLTransientException when the maxRetries has reached.
*/
public static PreparedStatement prepareWithRetry(Connection conn, String stmt,
- int waitWindow, int maxRetries) throws SQLException {
+ long waitWindow, int maxRetries) throws SQLException {
Random r = new Random();
@@ -2895,7 +2945,7 @@ public final class Utilities {
* @param r a random generator.
* @return number of milliseconds for the next wait time.
*/
- public static long getRandomWaitTime(int baseWindow, int failures, Random r) {
+ public static long getRandomWaitTime(long baseWindow, int failures, Random r) {
return (long) (
baseWindow * failures + // grace period for the last round of attempt
baseWindow * (failures + 1) * r.nextDouble()); // expanding time window for each failure
@@ -3381,7 +3431,6 @@ public final class Utilities {
private static void createTmpDirs(Configuration conf,
List<Operator<? extends OperatorDesc>> ops) throws IOException {
- FsPermission fsPermission = new FsPermission((short)00777);
while (!ops.isEmpty()) {
Operator<? extends OperatorDesc> op = ops.remove(0);
@@ -3391,7 +3440,8 @@ public final class Utilities {
if (tempDir != null) {
Path tempPath = Utilities.toTempPath(tempDir);
- createDirsWithPermission(conf, tempPath, fsPermission);
+ FileSystem fs = tempPath.getFileSystem(conf);
+ fs.mkdirs(tempPath);
}
}
@@ -3404,7 +3454,7 @@ public final class Utilities {
/**
* Returns true if a plan is both configured for vectorized execution
* and vectorization is allowed. The plan may be configured for vectorization
- * but vectorization dissalowed eg. for FetchOperator execution.
+ * but vectorization disallowed eg. for FetchOperator execution.
*/
public static boolean isVectorMode(Configuration conf) {
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) &&
@@ -3527,77 +3577,6 @@ public final class Utilities {
}
/**
- * @param conf the configuration used to derive the filesystem to create the path
- * @param mkdir the path to be created
- * @param fsPermission ignored if it is hive server session and doAs is enabled
- * @return true if successfully created the directory else false
- * @throws IOException if hdfs experiences any error conditions
- */
- public static boolean createDirsWithPermission(Configuration conf, Path mkdir,
- FsPermission fsPermission) throws IOException {
-
- boolean recursive = false;
- if (SessionState.get() != null) {
- recursive = SessionState.get().isHiveServerQuery() &&
- conf.getBoolean(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname,
- HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.defaultBoolVal);
- // we reset the permission in case of hive server and doAs enabled because
- // currently scratch directory uses /tmp/hive-hive as the scratch directory.
- // However, with doAs enabled, the first user to create this directory would
- // own the directory and subsequent users cannot access the scratch directory.
- // The right fix is to have scratch dir per user.
- fsPermission = new FsPermission((short)00777);
- }
-
- // if we made it so far without exception we are good!
- return createDirsWithPermission(conf, mkdir, fsPermission, recursive);
- }
-
- private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask,
- String origUmask, FileSystem fs) throws IOException {
- if (unsetUmask) {
- if (origUmask != null) {
- conf.set(FsPermission.UMASK_LABEL, origUmask);
- } else {
- // TODO HIVE-7831
- // conf.unset(FsPermission.UMASK_LABEL);
- }
- }
-
- fs.close();
- }
-
- public static boolean createDirsWithPermission(Configuration conf, Path mkdirPath,
- FsPermission fsPermission, boolean recursive) throws IOException {
- String origUmask = null;
- LOG.debug("Create dirs " + mkdirPath + " with permission " + fsPermission + " recursive " +
- recursive);
-
- if (recursive) {
- origUmask = conf.get(FsPermission.UMASK_LABEL);
- // this umask is required because by default the hdfs mask is 022 resulting in
- // all parents getting the fsPermission & !(022) permission instead of fsPermission
- conf.set(FsPermission.UMASK_LABEL, "000");
- }
-
- FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf);
- boolean retval = false;
- try {
- retval = fs.mkdirs(mkdirPath, fsPermission);
- resetConfAndCloseFS(conf, recursive, origUmask, fs);
- } catch (IOException ioe) {
- try {
- resetConfAndCloseFS(conf, recursive, origUmask, fs);
- }
- catch (IOException e) {
- // do nothing - double failure
- }
- }
- return retval;
- }
-
-
- /**
* Convert path to qualified path.
*
* @param conf
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java Mon Sep 8 04:38:17 2014
@@ -28,39 +28,38 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction;
@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.TYPE})
+@Target(ElementType.TYPE)
@Documented
-public @interface WindowFunctionDescription
-{
- Description description ();
- /**
- * controls whether this function can be applied to a Window.
- * <p>
- * Ranking function: Rank, Dense_Rank, Percent_Rank and Cume_Dist don't operate on Windows.
- * Why? a window specification implies a row specific range i.e. every row gets its own set of rows to process the UDAF on.
- * For ranking defining a set of rows for every row makes no sense.
- * <p>
- * All other UDAFs can be computed for a Window.
- */
- boolean supportsWindow() default true;
- /**
- * A WindowFunc is implemented as {@link GenericUDAFResolver2}. It returns only one value.
- * If this is true then the function must return a List which is taken to be the column for this function in the Output table returned by the
- * {@link WindowingTableFunction}. Otherwise the output is assumed to be a single value, the column of the Output will contain the same value
- * for all the rows.
- */
- boolean pivotResult() default false;
+public @interface WindowFunctionDescription {
+ Description description ();
+ /**
+ * controls whether this function can be applied to a Window.
+ * <p>
+ * Ranking function: Rank, Dense_Rank, Percent_Rank and Cume_Dist don't operate on Windows.
+ * Why? a window specification implies a row specific range i.e. every row gets its own set of rows to process the UDAF on.
+ * For ranking defining a set of rows for every row makes no sense.
+ * <p>
+ * All other UDAFs can be computed for a Window.
+ */
+ boolean supportsWindow() default true;
+ /**
+ * A WindowFunc is implemented as {@link GenericUDAFResolver2}. It returns only one value.
+ * If this is true then the function must return a List which is taken to be the column for this function in the Output table returned by the
+ * {@link WindowingTableFunction}. Otherwise the output is assumed to be a single value, the column of the Output will contain the same value
+ * for all the rows.
+ */
+ boolean pivotResult() default false;
- /**
- * Used in translations process to validate arguments
- * @return true if ranking function
- */
- boolean rankingFunction() default false;
+ /**
+ * Used in translations process to validate arguments
+ * @return true if ranking function
+ */
+ boolean rankingFunction() default false;
- /**
- * Using in analytical functions to specify that UDF implies an ordering
- * @return true if the function implies order
- */
- boolean impliesOrder() default false;
+ /**
+ * Using in analytical functions to specify that UDF implies an ordering
+ * @return true if the function implies order
+ */
+ boolean impliesOrder() default false;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java Mon Sep 8 04:38:17 2014
@@ -22,45 +22,39 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hive.common.util.AnnotationUtils;
@SuppressWarnings("deprecation")
-public class WindowFunctionInfo implements CommonFunctionInfo
-{
- boolean supportsWindow = true;
- boolean pivotResult = false;
- boolean impliesOrder = false;
- FunctionInfo fInfo;
-
- WindowFunctionInfo(FunctionInfo fInfo)
- {
- assert fInfo.isGenericUDAF();
- this.fInfo = fInfo;
- Class<? extends GenericUDAFResolver> wfnCls = fInfo.getGenericUDAFResolver().getClass();
- WindowFunctionDescription def =
+public class WindowFunctionInfo implements CommonFunctionInfo {
+ boolean supportsWindow = true;
+ boolean pivotResult = false;
+ boolean impliesOrder = false;
+ FunctionInfo fInfo;
+
+ WindowFunctionInfo(FunctionInfo fInfo) {
+ assert fInfo.isGenericUDAF();
+ this.fInfo = fInfo;
+ Class<? extends GenericUDAFResolver> wfnCls = fInfo.getGenericUDAFResolver().getClass();
+ WindowFunctionDescription def =
AnnotationUtils.getAnnotation(wfnCls, WindowFunctionDescription.class);
- if ( def != null)
- {
- supportsWindow = def.supportsWindow();
- pivotResult = def.pivotResult();
- impliesOrder = def.impliesOrder();
- }
- }
-
- public boolean isSupportsWindow()
- {
- return supportsWindow;
- }
-
- public boolean isPivotResult()
- {
- return pivotResult;
- }
-
- public boolean isImpliesOrder(){
- return impliesOrder;
- }
- public FunctionInfo getfInfo()
- {
- return fInfo;
- }
+ if ( def != null) {
+ supportsWindow = def.supportsWindow();
+ pivotResult = def.pivotResult();
+ impliesOrder = def.impliesOrder();
+ }
+ }
+
+ public boolean isSupportsWindow() {
+ return supportsWindow;
+ }
+
+ public boolean isPivotResult() {
+ return pivotResult;
+ }
+
+ public boolean isImpliesOrder() {
+ return impliesOrder;
+ }
+ public FunctionInfo getfInfo() {
+ return fInfo;
+ }
@Override
public Class<?> getFunctionClass() {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java Mon Sep 8 04:38:17 2014
@@ -63,7 +63,7 @@ public class MapJoinMemoryExhaustionHand
if(maxHeapSize == -1) {
this.maxHeapSize = 200L * 1024L * 1024L;
LOG.warn("MemoryMXBean.getHeapMemoryUsage().getMax() returned -1, " +
- "defaulting maxHeapSize to 200MB");
+ "defaulting maxHeapSize to 200MB");
} else {
this.maxHeapSize = maxHeapSize;
}
@@ -91,4 +91,4 @@ public class MapJoinMemoryExhaustionHand
throw new MapJoinMemoryExhaustionException(msg);
}
}
-}
\ No newline at end of file
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Mon Sep 8 04:38:17 2014
@@ -371,7 +371,7 @@ public class ExecDriver extends Task<Map
Utilities.setMapRedWork(job, work, ctx.getMRTmpPath());
- if (mWork.getSamplingType() > 0 && rWork != null && rWork.getNumReduceTasks() > 1) {
+ if (mWork.getSamplingType() > 0 && rWork != null && job.getNumReduceTasks() > 1) {
try {
handleSampling(driverContext, mWork, job, conf);
job.setPartitionerClass(HiveTotalOrderPartitioner.class);
@@ -539,7 +539,7 @@ public class ExecDriver extends Task<Map
} else {
throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType());
}
- sampler.writePartitionKeys(partitionFile, job);
+ sampler.writePartitionKeys(partitionFile, conf, job);
}
/**
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Mon Sep 8 04:38:17 2014
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -219,8 +220,8 @@ public class HadoopJobExecHelper {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
//DecimalFormat longFormatter = new DecimalFormat("###,###");
long reportTime = System.currentTimeMillis();
- long maxReportInterval =
- HiveConf.getLongVar(job, HiveConf.ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL);
+ long maxReportInterval = HiveConf.getTimeVar(
+ job, HiveConf.ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS);
boolean fatal = false;
StringBuilder errMsg = new StringBuilder();
long pullInterval = HiveConf.getLongVar(job, HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL);
@@ -634,7 +635,7 @@ public class HadoopJobExecHelper {
for (String clientStatsPublisherClass : clientStatsPublisherClasses) {
try {
clientStatsPublishers.add((ClientStatsPublisher) Class.forName(
- clientStatsPublisherClass.trim(), true, JavaUtils.getClassLoader()).newInstance());
+ clientStatsPublisherClass.trim(), true, Utilities.getSessionSpecifiedClassLoader()).newInstance());
} catch (Exception e) {
LOG.warn(e.getClass().getName() + " occured when trying to create class: "
+ clientStatsPublisherClass.trim() + " implementing ClientStatsPublisher interface");
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Mon Sep 8 04:38:17 2014
@@ -130,7 +130,7 @@ public class MapRedTask extends ExecDriv
runningViaChild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
- if(!runningViaChild) {
+ if (!runningViaChild) {
// we are not running this mapred task via child jvm
// so directly invoke ExecDriver
return super.execute(driverContext);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java Mon Sep 8 04:38:17 2014
@@ -14,10 +14,10 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.WriteBuffers;
import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -57,7 +57,7 @@ public class MapJoinBytesTableContainer
private boolean[] sortableSortOrders;
private KeyValueHelper writeHelper;
- private List<Object> EMPTY_LIST = new ArrayList<Object>(0);
+ private final List<Object> EMPTY_LIST = new ArrayList<Object>(0);
public MapJoinBytesTableContainer(Configuration hconf,
MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException {
@@ -476,6 +476,7 @@ public class MapJoinBytesTableContainer
return valueStruct.getFieldsAsList(); // TODO: should we unset bytes after that?
}
+ @Override
public void addRow(List<Object> t) {
if (dummyRow != null || !refs.isEmpty()) {
throw new RuntimeException("Cannot add rows when not empty");
@@ -484,9 +485,11 @@ public class MapJoinBytesTableContainer
}
// Various unsupported methods.
+ @Override
public void addRow(Object[] value) {
throw new RuntimeException(this.getClass().getCanonicalName() + " cannot add arrays");
}
+ @Override
public void write(MapJoinObjectSerDeContext valueContext, ObjectOutputStream out) {
throw new RuntimeException(this.getClass().getCanonicalName() + " cannot be serialized");
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java Mon Sep 8 04:38:17 2014
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
import org.apache.hadoop.io.Writable;
/**
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java Mon Sep 8 04:38:17 2014
@@ -32,7 +32,7 @@ import org.apache.hadoop.io.Writable;
@SuppressWarnings("deprecation")
public class MapJoinTableContainerSerDe {
-
+
private final MapJoinObjectSerDeContext keyContext;
private final MapJoinObjectSerDeContext valueContext;
public MapJoinTableContainerSerDe(MapJoinObjectSerDeContext keyContext,
@@ -70,7 +70,7 @@ public class MapJoinTableContainerSerDe
}
try {
Writable keyContainer = keySerDe.getSerializedClass().newInstance();
- Writable valueContainer = valueSerDe.getSerializedClass().newInstance();
+ Writable valueContainer = valueSerDe.getSerializedClass().newInstance();
int numKeys = in.readInt();
for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
MapJoinKeyObject key = new MapJoinKeyObject();
@@ -89,7 +89,7 @@ public class MapJoinTableContainerSerDe
public void persist(ObjectOutputStream out, MapJoinPersistableTableContainer tableContainer)
throws HiveException {
int numKeys = tableContainer.size();
- try {
+ try {
out.writeUTF(tableContainer.getClass().getName());
out.writeObject(tableContainer.getMetaData());
out.writeInt(numKeys);
@@ -108,7 +108,7 @@ public class MapJoinTableContainerSerDe
throw new ConcurrentModificationException("TableContainer was modified while persisting: " + tableContainer);
}
}
-
+
public static void persistDummyTable(ObjectOutputStream out) throws IOException {
MapJoinPersistableTableContainer tableContainer = new HashMapWrapper();
out.writeUTF(tableContainer.getClass().getName());
@@ -127,8 +127,8 @@ public class MapJoinTableContainerSerDe
return constructor.newInstance(metaData);
} catch (Exception e) {
String msg = "Error while attemping to create table container" +
- " of type: " + name + ", with metaData: " + metaData;
+ " of type: " + name + ", with metaData: " + metaData;
throw new HiveException(msg, e);
}
}
-}
\ No newline at end of file
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java Mon Sep 8 04:38:17 2014
@@ -270,7 +270,7 @@ public class PTFRowContainer<Row extends
FileSystem fs = finalOutPath.getFileSystem(jc);
final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc, fs, finalOutPath,
- BytesWritable.class, valueClass, isCompressed, progress);
+ BytesWritable.class, valueClass, isCompressed, progress);
return new PTFRecordWriter(outStream);
}