You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/08 06:38:26 UTC
svn commit: r1623263 [21/28] - in /hive/branches/spark: ./
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/
ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/
beeline/src/test/org/apache/hive/beeline/ bin/...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Mon Sep 8 04:38:17 2014
@@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Mon Sep 8 04:38:17 2014
@@ -96,9 +96,6 @@ class WriterImpl implements Writer, Memo
private static final int HDFS_BUFFER_SIZE = 256 * 1024;
private static final int MIN_ROW_INDEX_STRIDE = 1000;
- // HDFS requires blocks < 2GB and multiples of 512, so pick 1.5GB
- private static final long MAX_BLOCK_SIZE = 1536 * 1024 * 1024;
-
// threshold above which buffer size will be automatically resized
private static final int COLUMN_COUNT_THRESHOLD = 1000;
@@ -135,8 +132,6 @@ class WriterImpl implements Writer, Memo
new TreeMap<String, ByteString>();
private final StreamFactory streamFactory = new StreamFactory();
private final TreeWriter treeWriter;
- private final OrcProto.RowIndex.Builder rowIndex =
- OrcProto.RowIndex.newBuilder();
private final boolean buildIndex;
private final MemoryManager memoryManager;
private final OrcFile.Version version;
@@ -678,7 +673,7 @@ class WriterImpl implements Writer, Memo
if (rowIndexStream != null) {
if (rowIndex.getEntryCount() != requiredIndexEntries) {
throw new IllegalArgumentException("Column has wrong number of " +
- "index entries found: " + rowIndexEntry + " expected: " +
+ "index entries found: " + rowIndex.getEntryCount() + " expected: " +
requiredIndexEntries);
}
rowIndex.build().writeTo(rowIndexStream);
@@ -1005,6 +1000,8 @@ class WriterImpl implements Writer, Memo
private final float dictionaryKeySizeThreshold;
private boolean useDictionaryEncoding = true;
private boolean isDirectV2 = true;
+ private boolean doneDictionaryCheck;
+ private final boolean strideDictionaryCheck;
StringTreeWriter(int columnId,
ObjectInspector inspector,
@@ -1025,9 +1022,14 @@ class WriterImpl implements Writer, Memo
directLengthOutput = createIntegerWriter(writer.createStream(id,
OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
dictionaryKeySizeThreshold = writer.getConfiguration().getFloat(
- HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname,
- HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.
- defaultFloatVal);
+ HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname,
+ HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.
+ defaultFloatVal);
+ strideDictionaryCheck = writer.getConfiguration().getBoolean(
+ HiveConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK.varname,
+ HiveConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK.
+ defaultBoolVal);
+ doneDictionaryCheck = false;
}
/**
@@ -1045,21 +1047,71 @@ class WriterImpl implements Writer, Memo
super.write(obj);
if (obj != null) {
Text val = getTextValue(obj);
- rows.add(dictionary.add(val));
+ if (useDictionaryEncoding || !strideDictionaryCheck) {
+ rows.add(dictionary.add(val));
+ } else {
+ // write data and length
+ directStreamOutput.write(val.getBytes(), 0, val.getLength());
+ directLengthOutput.write(val.getLength());
+ }
indexStatistics.updateString(val);
}
}
+ private boolean checkDictionaryEncoding() {
+ if (!doneDictionaryCheck) {
+ // Set the flag indicating whether or not to use dictionary encoding
+ // based on whether or not the fraction of distinct keys over number of
+ // non-null rows is less than the configured threshold
+ float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
+ useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
+ doneDictionaryCheck = true;
+ }
+ return useDictionaryEncoding;
+ }
+
@Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
- // Set the flag indicating whether or not to use dictionary encoding
- // based on whether or not the fraction of distinct keys over number of
- // non-null rows is less than the configured threshold
- useDictionaryEncoding =
- (!isDirectV2) || (rows.size() > 0 &&
- (float)(dictionary.size()) / rows.size() <=
- dictionaryKeySizeThreshold);
+ // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
+ // checking would not have happened. So do it again here.
+ checkDictionaryEncoding();
+
+ if (useDictionaryEncoding) {
+ flushDictionary();
+ } else {
+ // flushout any left over entries from dictionary
+ if (rows.size() > 0) {
+ flushDictionary();
+ }
+
+ // suppress the stream for every stripe if dictionary is disabled
+ stringOutput.suppress();
+ }
+
+ // we need to build the rowindex before calling super, since it
+ // writes it out.
+ super.writeStripe(builder, requiredIndexEntries);
+ stringOutput.flush();
+ lengthOutput.flush();
+ rowOutput.flush();
+ directStreamOutput.flush();
+ directLengthOutput.flush();
+ // reset all of the fields to be ready for the next stripe.
+ dictionary.clear();
+ savedRowIndex.clear();
+ rowIndexValueCount.clear();
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add(0L);
+
+ if (!useDictionaryEncoding) {
+ // record the start positions of first index stride of next stripe i.e
+ // beginning of the direct streams when dictionary is disabled
+ recordDirectStreamPosition();
+ }
+ }
+
+ private void flushDictionary() throws IOException {
final int[] dumpOrder = new int[dictionary.size()];
if (useDictionaryEncoding) {
@@ -1113,21 +1165,7 @@ class WriterImpl implements Writer, Memo
}
}
}
- // we need to build the rowindex before calling super, since it
- // writes it out.
- super.writeStripe(builder, requiredIndexEntries);
- stringOutput.flush();
- lengthOutput.flush();
- rowOutput.flush();
- directStreamOutput.flush();
- directLengthOutput.flush();
- // reset all of the fields to be ready for the next stripe.
- dictionary.clear();
rows.clear();
- savedRowIndex.clear();
- rowIndexValueCount.clear();
- recordPosition(rowIndexPosition);
- rowIndexValueCount.add(0L);
}
@Override
@@ -1165,10 +1203,30 @@ class WriterImpl implements Writer, Memo
OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
rowIndexEntry.setStatistics(indexStatistics.serialize());
indexStatistics.reset();
- savedRowIndex.add(rowIndexEntry.build());
+ OrcProto.RowIndexEntry base = rowIndexEntry.build();
+ savedRowIndex.add(base);
rowIndexEntry.clear();
recordPosition(rowIndexPosition);
rowIndexValueCount.add(Long.valueOf(rows.size()));
+ if (strideDictionaryCheck) {
+ checkDictionaryEncoding();
+ }
+ if (!useDictionaryEncoding) {
+ if (rows.size() > 0) {
+ flushDictionary();
+ // just record the start positions of next index stride
+ recordDirectStreamPosition();
+ } else {
+ // record the start positions of next index stride
+ recordDirectStreamPosition();
+ getRowIndex().addEntry(base);
+ }
+ }
+ }
+
+ private void recordDirectStreamPosition() throws IOException {
+ directStreamOutput.getPosition(rowIndexPosition);
+ directLengthOutput.getPosition(rowIndexPosition);
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java Mon Sep 8 04:38:17 2014
@@ -95,7 +95,7 @@ public class HiveSchemaConverter {
int scale = decimalTypeInfo.scale();
int bytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1];
return Types.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(bytes).as(OriginalType.DECIMAL).
- scale(scale).precision(prec).named(name);
+ scale(scale).precision(prec).named(name);
} else if (typeInfo.equals(TypeInfoFactory.unknownTypeInfo)) {
throw new UnsupportedOperationException("Unknown type not implemented");
} else {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java Mon Sep 8 04:38:17 2014
@@ -140,7 +140,7 @@ public class DataWritableReadSupport ext
throw new IllegalStateException(msg);
}
}
- }
+ }
}
requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(),
typeListWanted), fileSchema, configuration);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java Mon Sep 8 04:38:17 2014
@@ -27,6 +27,8 @@ import org.apache.commons.lang.StringUti
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java Mon Sep 8 04:38:17 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.lockmgr
import org.apache.hadoop.hive.ql.metadata.*;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
@@ -38,7 +39,7 @@ public class EmbeddedLockManager impleme
private HiveLockManagerCtx ctx;
- private int sleepTime = 1000;
+ private long sleepTime = 1000;
private int numRetriesForLock = 0;
private int numRetriesForUnLock = 0;
@@ -82,12 +83,13 @@ public class EmbeddedLockManager impleme
public void refresh() {
HiveConf conf = ctx.getConf();
- sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000;
+ sleepTime = conf.getTimeVar(
+ HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES);
numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
}
- public HiveLock lock(HiveLockObject key, HiveLockMode mode, int numRetriesForLock, int sleepTime)
+ public HiveLock lock(HiveLockObject key, HiveLockMode mode, int numRetriesForLock, long sleepTime)
throws LockException {
for (int i = 0; i <= numRetriesForLock; i++) {
if (i > 0) {
@@ -101,7 +103,7 @@ public class EmbeddedLockManager impleme
return null;
}
- private void sleep(int sleepTime) {
+ private void sleep(long sleepTime) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
@@ -109,7 +111,7 @@ public class EmbeddedLockManager impleme
}
}
- public List<HiveLock> lock(List<HiveLockObj> objs, int numRetriesForLock, int sleepTime)
+ public List<HiveLock> lock(List<HiveLockObj> objs, int numRetriesForLock, long sleepTime)
throws LockException {
sortLocks(objs);
for (int i = 0; i <= numRetriesForLock; i++) {
@@ -132,7 +134,7 @@ public class EmbeddedLockManager impleme
}
private List<HiveLock> lockPrimitive(List<HiveLockObj> objs, int numRetriesForLock,
- int sleepTime) throws LockException {
+ long sleepTime) throws LockException {
List<HiveLock> locks = new ArrayList<HiveLock>();
for (HiveLockObj obj : objs) {
HiveLock lock = lockPrimitive(obj.getObj(), obj.getMode());
@@ -164,7 +166,7 @@ public class EmbeddedLockManager impleme
});
}
- public void unlock(HiveLock hiveLock, int numRetriesForUnLock, int sleepTime)
+ public void unlock(HiveLock hiveLock, int numRetriesForUnLock, long sleepTime)
throws LockException {
String[] paths = hiveLock.getHiveLockObject().getPaths();
HiveLockObjectData data = hiveLock.getHiveLockObject().getData();
@@ -179,7 +181,7 @@ public class EmbeddedLockManager impleme
throw new LockException("Failed to release lock " + hiveLock);
}
- public void releaseLocks(List<HiveLock> hiveLocks, int numRetriesForUnLock, int sleepTime) {
+ public void releaseLocks(List<HiveLock> hiveLocks, int numRetriesForUnLock, long sleepTime) {
for (HiveLock locked : hiveLocks) {
try {
unlock(locked, numRetriesForUnLock, sleepTime);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java Mon Sep 8 04:38:17 2014
@@ -37,6 +37,7 @@ import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -53,7 +54,7 @@ public class ZooKeeperHiveLockManager im
private int sessionTimeout;
private String quorumServers;
- private int sleepTime;
+ private long sleepTime;
private int numRetriesForLock;
private int numRetriesForUnLock;
@@ -106,7 +107,8 @@ public class ZooKeeperHiveLockManager im
sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
quorumServers = ZooKeeperHiveLockManager.getQuorumServers(conf);
- sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000;
+ sleepTime = conf.getTimeVar(
+ HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES);
numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
@@ -132,7 +134,8 @@ public class ZooKeeperHiveLockManager im
@Override
public void refresh() {
HiveConf conf = ctx.getConf();
- sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000;
+ sleepTime = conf.getTimeVar(
+ HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES);
numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
}
@@ -268,7 +271,7 @@ public class ZooKeeperHiveLockManager im
* @param mode
* The mode of the lock
* @param keepAlive
- * Whether the lock is to be persisted after the statement Acuire the
+ * Whether the lock is to be persisted after the statement Acquire the
* lock. Return null if a conflicting lock is present.
**/
public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode,
@@ -515,8 +518,8 @@ public class ZooKeeperHiveLockManager im
try {
int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
String quorumServers = getQuorumServers(conf);
- Watcher dummWatcher = new DummyWatcher();
- zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummWatcher);
+ Watcher dummyWatcher = new DummyWatcher();
+ zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummyWatcher);
String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
List<HiveLock> locks = getLocks(conf, zkpClient, null, parent, false, false);
Exception lastExceptionGot = null;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Mon Sep 8 04:38:17 2014
@@ -109,6 +109,7 @@ import org.apache.hadoop.hive.shims.Hado
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import com.google.common.collect.Sets;
@@ -128,6 +129,7 @@ public class Hive {
private HiveConf conf = null;
private IMetaStoreClient metaStoreClient;
+ private UserGroupInformation owner;
private static ThreadLocal<Hive> hiveDB = new ThreadLocal<Hive>() {
@Override
@@ -181,7 +183,11 @@ public class Hive {
*/
public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException {
Hive db = hiveDB.get();
- if (db == null || needsRefresh) {
+ if (db == null || needsRefresh || !db.isCurrentUserOwner()) {
+ if (db != null) {
+ LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh +
+ ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
+ }
closeCurrent();
c.set("fs.scheme.class", "dfs");
Hive newdb = new Hive(c);
@@ -194,6 +200,11 @@ public class Hive {
public static Hive get() throws HiveException {
Hive db = hiveDB.get();
+ if (db != null && !db.isCurrentUserOwner()) {
+ LOG.debug("Creating new db. db.isCurrentUserOwner = " + db.isCurrentUserOwner());
+ db.close();
+ db = null;
+ }
if (db == null) {
SessionState session = SessionState.get();
db = new Hive(session == null ? new HiveConf(Hive.class) : session.getConf());
@@ -220,6 +231,17 @@ public class Hive {
conf = c;
}
+
+ private boolean isCurrentUserOwner() throws HiveException {
+ try {
+ return owner == null || owner.equals(UserGroupInformation.getCurrentUser());
+ } catch(IOException e) {
+ throw new HiveException("Error getting current user: " + e.getMessage(), e);
+ }
+ }
+
+
+
/**
* closes the connection to metastore for the calling thread
*/
@@ -2496,6 +2518,13 @@ private void constructOneLBLocationMap(F
@Unstable
public IMetaStoreClient getMSC() throws MetaException {
if (metaStoreClient == null) {
+ try {
+ owner = UserGroupInformation.getCurrentUser();
+ } catch(IOException e) {
+ String msg = "Error getting current user: " + e.getMessage();
+ LOG.error(msg, e);
+ throw new MetaException(msg + "\n" + StringUtils.stringifyException(e));
+ }
metaStoreClient = createMetaStoreClient();
}
return metaStoreClient;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java Mon Sep 8 04:38:17 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
import org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator;
import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
@@ -307,7 +308,7 @@ public final class HiveUtils {
try {
Class<? extends HiveStorageHandler> handlerClass =
(Class<? extends HiveStorageHandler>)
- Class.forName(className, true, JavaUtils.getClassLoader());
+ Class.forName(className, true, Utilities.getSessionSpecifiedClassLoader());
HiveStorageHandler storageHandler = ReflectionUtils.newInstance(handlerClass, conf);
return storageHandler;
} catch (ClassNotFoundException e) {
@@ -329,7 +330,7 @@ public final class HiveUtils {
try {
Class<? extends HiveIndexHandler> handlerClass =
(Class<? extends HiveIndexHandler>)
- Class.forName(indexHandlerClass, true, JavaUtils.getClassLoader());
+ Class.forName(indexHandlerClass, true, Utilities.getSessionSpecifiedClassLoader());
HiveIndexHandler indexHandler = ReflectionUtils.newInstance(handlerClass, conf);
return indexHandler;
} catch (ClassNotFoundException e) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Mon Sep 8 04:38:17 2014
@@ -302,7 +302,7 @@ public class Partition implements Serial
}
try {
inputFormatClass = ((Class<? extends InputFormat>) Class.forName(clsName, true,
- JavaUtils.getClassLoader()));
+ Utilities.getSessionSpecifiedClassLoader()));
} catch (ClassNotFoundException e) {
throw new HiveException("Class not found: " + clsName, e);
}
@@ -322,7 +322,7 @@ public class Partition implements Serial
}
try {
Class<?> c = (Class.forName(clsName, true,
- JavaUtils.getClassLoader()));
+ Utilities.getSessionSpecifiedClassLoader()));
// Replace FileOutputFormat for backward compatibility
if (!HiveOutputFormat.class.isAssignableFrom(c)) {
outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(c,false);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Mon Sep 8 04:38:17 2014
@@ -13,6 +13,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.common.FileUtils;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Mon Sep 8 04:38:17 2014
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
@@ -293,7 +294,7 @@ public class Table implements Serializab
inputFormatClass = getStorageHandler().getInputFormatClass();
} else {
inputFormatClass = (Class<? extends InputFormat>)
- Class.forName(className, true, JavaUtils.getClassLoader());
+ Class.forName(className, true, Utilities.getSessionSpecifiedClassLoader());
}
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
@@ -329,7 +330,7 @@ public class Table implements Serializab
}
else {
c = Class.forName(className, true,
- JavaUtils.getClassLoader());
+ Utilities.getSessionSpecifiedClassLoader());
}
}
if (!HiveOutputFormat.class.isAssignableFrom(c)) {
@@ -677,7 +678,7 @@ public class Table implements Serializab
}
try {
setInputFormatClass((Class<? extends InputFormat<WritableComparable, Writable>>) Class
- .forName(name, true, JavaUtils.getClassLoader()));
+ .forName(name, true, Utilities.getSessionSpecifiedClassLoader()));
} catch (ClassNotFoundException e) {
throw new HiveException("Class not found: " + name, e);
}
@@ -690,7 +691,7 @@ public class Table implements Serializab
return;
}
try {
- Class<?> origin = Class.forName(name, true, JavaUtils.getClassLoader());
+ Class<?> origin = Class.forName(name, true, Utilities.getSessionSpecifiedClassLoader());
setOutputFormatClass(HiveFileFormatUtils
.getOutputFormatSubstitute(origin,false));
} catch (ClassNotFoundException e) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Mon Sep 8 04:38:17 2014
@@ -369,7 +369,7 @@ public final class ColumnPrunerProcFacto
return null;
}
cols = cols == null ? new ArrayList<String>() : cols;
-
+
cppCtx.getPrunedColLists().put((Operator<? extends OperatorDesc>) nd,
cols);
RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver();
@@ -479,13 +479,13 @@ public final class ColumnPrunerProcFacto
flags[index] = true;
colLists = Utilities.mergeUniqElems(colLists, valCols.get(index).getCols());
}
-
+
Collections.sort(colLists);
pruneReduceSinkOperator(flags, op, cppCtx);
cppCtx.getPrunedColLists().put(op, colLists);
return null;
}
-
+
// Reduce Sink contains the columns needed - no need to aggregate from
// children
for (ExprNodeDesc val : valCols) {
@@ -519,7 +519,7 @@ public final class ColumnPrunerProcFacto
if (cols == null) {
return null;
}
-
+
Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
// As columns go down the DAG, the LVJ will transform internal column
// names from something like 'key' to '_col0'. Because of this, we need
@@ -604,8 +604,8 @@ public final class ColumnPrunerProcFacto
Object... nodeOutputs) throws SemanticException {
SelectOperator op = (SelectOperator) nd;
ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
-
-
+
+
if (op.getChildOperators() != null) {
for (Operator<? extends OperatorDesc> child : op.getChildOperators()) {
// UDTF is not handled yet, so the parent SelectOp of UDTF should just assume
@@ -858,11 +858,11 @@ public final class ColumnPrunerProcFacto
if (inputSchema != null) {
ArrayList<ColumnInfo> rs = new ArrayList<ColumnInfo>();
ArrayList<ColumnInfo> inputCols = inputSchema.getSignature();
- for (ColumnInfo i: inputCols) {
+ for (ColumnInfo i: inputCols) {
if (cols.contains(i.getInternalName())) {
rs.add(i);
}
- }
+ }
op.getSchema().setSignature(rs);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Mon Sep 8 04:38:17 2014
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -29,7 +29,6 @@ import java.util.Stack;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hive.ql.exec.Ro
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -100,7 +100,7 @@ public final class ConstantPropagateProc
/**
* Get ColumnInfo from column expression.
- *
+ *
* @param rr
* @param desc
* @return
@@ -139,7 +139,7 @@ public final class ConstantPropagateProc
/**
* Cast type from expression type to expected type ti.
- *
+ *
* @param desc constant expression
* @param ti expected type info
* @return cast constant, or null if the type cast failed.
@@ -189,10 +189,10 @@ public final class ConstantPropagateProc
/**
* Fold input expression desc.
- *
+ *
* If desc is a UDF and all parameters are constants, evaluate it. If desc is a column expression,
* find it from propagated constants, and if there is, replace it with constant.
- *
+ *
* @param desc folding expression
* @param constants current propagated constant map
* @param cppCtx
@@ -275,7 +275,7 @@ public final class ConstantPropagateProc
String udfClassName = bridge.getUdfClassName();
try {
UDF udfInternal =
- (UDF) Class.forName(bridge.getUdfClassName(), true, JavaUtils.getClassLoader())
+ (UDF) Class.forName(bridge.getUdfClassName(), true, Utilities.getSessionSpecifiedClassLoader())
.newInstance();
files = udfInternal.getRequiredFiles();
jars = udf.getRequiredJars();
@@ -296,7 +296,7 @@ public final class ConstantPropagateProc
/**
* Propagate assignment expression, adding an entry into constant map constants.
- *
+ *
* @param udf expression UDF, currently only 2 UDFs are supported: '=' and 'is null'.
* @param newExprs child expressions (parameters).
* @param cppCtx
@@ -350,7 +350,7 @@ public final class ConstantPropagateProc
ExprNodeConstantDesc c = (ExprNodeConstantDesc) childExpr;
if (Boolean.TRUE.equals(c.getValue())) {
- // if true, prune it
+ // if true, prune it
return newExprs.get(Math.abs(i - 1));
} else {
@@ -384,7 +384,7 @@ public final class ConstantPropagateProc
/**
* Evaluate column, replace the deterministic columns with constants if possible
- *
+ *
* @param desc
* @param ctx
* @param op
@@ -435,7 +435,7 @@ public final class ConstantPropagateProc
/**
* Evaluate UDF
- *
+ *
* @param udf UDF object
* @param exprs
* @param oldExprs
@@ -512,7 +512,7 @@ public final class ConstantPropagateProc
/**
* Change operator row schema, replace column with constant if it is.
- *
+ *
* @param op
* @param constants
* @throws SemanticException
@@ -584,7 +584,7 @@ public final class ConstantPropagateProc
/**
* Factory method to get the ConstantPropagateFilterProc class.
- *
+ *
* @return ConstantPropagateFilterProc
*/
public static ConstantPropagateFilterProc getFilterProc() {
@@ -621,7 +621,7 @@ public final class ConstantPropagateProc
/**
* Factory method to get the ConstantPropagateGroupByProc class.
- *
+ *
* @return ConstantPropagateGroupByProc
*/
public static ConstantPropagateGroupByProc getGroupByProc() {
@@ -650,7 +650,7 @@ public final class ConstantPropagateProc
/**
* Factory method to get the ConstantPropagateDefaultProc class.
- *
+ *
* @return ConstantPropagateDefaultProc
*/
public static ConstantPropagateDefaultProc getDefaultProc() {
@@ -683,7 +683,7 @@ public final class ConstantPropagateProc
/**
* The Factory method to get the ConstantPropagateSelectProc class.
- *
+ *
* @return ConstantPropagateSelectProc
*/
public static ConstantPropagateSelectProc getSelectProc() {
@@ -877,7 +877,7 @@ public final class ConstantPropagateProc
return null;
}
- // Note: the following code (removing folded constants in exprs) is deeply coupled with
+ // Note: the following code (removing folded constants in exprs) is deeply coupled with
// ColumnPruner optimizer.
// Assuming ColumnPrunner will remove constant columns so we don't deal with output columns.
// Except one case that the join operator is followed by a redistribution (RS operator).
@@ -941,12 +941,12 @@ public final class ConstantPropagateProc
return null;
}
- List<ExprNodeDesc> newChildren = new ArrayList<ExprNodeDesc>();
- for (ExprNodeDesc expr : pred.getChildren()) {
- ExprNodeDesc constant = foldExpr(expr, constants, cppCtx, op, 0, false);
- newChildren.add(constant);
+ ExprNodeDesc constant = foldExpr(pred, constants, cppCtx, op, 0, false);
+ if (constant instanceof ExprNodeGenericFuncDesc) {
+ conf.setFilterExpr((ExprNodeGenericFuncDesc) constant);
+ } else {
+ conf.setFilterExpr(null);
}
- pred.setChildren(newChildren);
return null;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Mon Sep 8 04:38:17 2014
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.optimizer;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -27,6 +28,7 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -363,6 +366,19 @@ public class ConvertJoinMapJoin implemen
Operator<? extends OperatorDesc> parentBigTableOp
= mapJoinOp.getParentOperators().get(bigTablePosition);
if (parentBigTableOp instanceof ReduceSinkOperator) {
+ for (Operator<?> p : parentBigTableOp.getParentOperators()) {
+ // we might have generated a dynamic partition operator chain. Since
+ // we're removing the reduce sink we need do remove that too.
+ Set<Operator<?>> dynamicPartitionOperators = new HashSet<Operator<?>>();
+ for (Operator<?> c : p.getChildOperators()) {
+ if (hasDynamicPartitionBroadcast(c)) {
+ dynamicPartitionOperators.add(c);
+ }
+ }
+ for (Operator<?> c : dynamicPartitionOperators) {
+ p.removeChild(c);
+ }
+ }
mapJoinOp.getParentOperators().remove(bigTablePosition);
if (!(mapJoinOp.getParentOperators().contains(
parentBigTableOp.getParentOperators().get(0)))) {
@@ -380,4 +396,16 @@ public class ConvertJoinMapJoin implemen
return mapJoinOp;
}
+
+ private boolean hasDynamicPartitionBroadcast(Operator<?> op) {
+ if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
+ return true;
+ }
+ for (Operator<?> c : op.getChildOperators()) {
+ if (hasDynamicPartitionBroadcast(c)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Mon Sep 8 04:38:17 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.ppd.PredicatePushDown;
import org.apache.hadoop.hive.ql.ppd.PredicateTransitivePropagate;
+import org.apache.hadoop.hive.ql.ppd.SyntheticJoinPredicate;
/**
* Implementation of the optimizer.
@@ -55,6 +56,7 @@ public class Optimizer {
transformations.add(new Generator());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
transformations.add(new PredicateTransitivePropagate());
+ transformations.add(new SyntheticJoinPredicate());
transformations.add(new PredicatePushDown());
transformations.add(new PartitionPruner());
transformations.add(new PartitionConditionRemover());
@@ -125,9 +127,9 @@ public class Optimizer {
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) {
transformations.add(new StatsOptimizer());
}
- if (pctx.getContext().getExplain() ||
- HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ||
- HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ if (pctx.getContext().getExplain()
+ && !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
+ && !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
transformations.add(new AnnotateWithStatistics());
transformations.add(new AnnotateWithOpTraits());
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java Mon Sep 8 04:38:17 2014
@@ -186,8 +186,7 @@ public abstract class PrunerExpressionOp
return ((ExprNodeNullDesc) nd).clone();
}
- assert (false);
- return null;
+ return new ExprNodeConstantDesc(((ExprNodeDesc)nd).getTypeInfo(), null);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Mon Sep 8 04:38:17 2014
@@ -26,8 +26,6 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -42,7 +40,6 @@ import org.apache.hadoop.hive.ql.parse.G
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.ColStatistics;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -134,7 +131,8 @@ public class ReduceSinkMapJoinProc imple
String prefix = Utilities.ReduceField.KEY.toString();
for (String keyCol : keyCols) {
ExprNodeDesc realCol = parentRS.getColumnExprMap().get(prefix + "." + keyCol);
- ColStatistics cs = StatsUtils.getColStatisticsFromExpression(null, stats, realCol);
+ ColStatistics cs =
+ StatsUtils.getColStatisticsFromExpression(context.conf, stats, realCol);
if (cs == null || cs.getCountDistint() <= 0) {
maxKeyCount = Long.MAX_VALUE;
break;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java Mon Sep 8 04:38:17 2014
@@ -353,14 +353,14 @@ public class OpProcFactory {
if (inpOp.getSchema() != null && inpOp.getSchema().getSignature() != null ) {
for(ColumnInfo ci : inpOp.getSchema().getSignature()) {
Dependency inp_dep = lctx.getIndex().getDependency(inpOp, ci);
- // The dependency can be null as some of the input cis may not have
- // been set in case of joins.
- if (inp_dep != null) {
- for(BaseColumnInfo bci : inp_dep.getBaseCols()) {
- new_type = LineageCtx.getNewDependencyType(inp_dep.getType(), new_type);
- tai_set.add(bci.getTabAlias());
- }
- }
+ // The dependency can be null as some of the input cis may not have
+ // been set in case of joins.
+ if (inp_dep != null) {
+ for(BaseColumnInfo bci : inp_dep.getBaseCols()) {
+ new_type = LineageCtx.getNewDependencyType(inp_dep.getType(), new_type);
+ tai_set.add(bci.getTabAlias());
+ }
+ }
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java Mon Sep 8 04:38:17 2014
@@ -396,8 +396,7 @@ public final class PcrExprProcFactory {
return new NodeInfoWrapper(WalkState.CONSTANT, null,
(ExprNodeDesc) nd);
}
- assert (false);
- return null;
+ return new NodeInfoWrapper(WalkState.UNKNOWN, null, (ExprNodeDesc)nd);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Mon Sep 8 04:38:17 2014
@@ -156,6 +156,10 @@ public class Vectorizer implements Physi
// The regex matches only the "decimal" prefix of the type.
patternBuilder.append("|decimal.*");
+ // CHAR and VARCHAR types can be specified with maximum length.
+ patternBuilder.append("|char.*");
+ patternBuilder.append("|varchar.*");
+
supportedDataTypesPattern = Pattern.compile(patternBuilder.toString());
supportedGenericUDFs.add(GenericUDFOPPlus.class);
@@ -248,6 +252,8 @@ public class Vectorizer implements Physi
supportedGenericUDFs.add(GenericUDFTimestamp.class);
supportedGenericUDFs.add(GenericUDFToDecimal.class);
supportedGenericUDFs.add(GenericUDFToDate.class);
+ supportedGenericUDFs.add(GenericUDFToChar.class);
+ supportedGenericUDFs.add(GenericUDFToVarchar.class);
// For conditional expressions
supportedGenericUDFs.add(GenericUDFIf.class);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Mon Sep 8 04:38:17 2014
@@ -20,11 +20,11 @@ package org.apache.hadoop.hive.ql.optimi
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -164,7 +164,7 @@ public class StatsRulesProcFactory {
sop.getSchema());
long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats);
stats.setColumnStats(colStats);
- stats.setDataSize(dataSize);
+ stats.setDataSize(setMaxIfInvalid(dataSize));
sop.setStatistics(stats);
if (LOG.isDebugEnabled()) {
@@ -250,7 +250,8 @@ public class StatsRulesProcFactory {
ExprNodeDesc pred = fop.getConf().getPredicate();
// evaluate filter expression and update statistics
- long newNumRows = evaluateExpression(parentStats, pred, aspCtx, neededCols);
+ long newNumRows = evaluateExpression(parentStats, pred, aspCtx,
+ neededCols, fop);
Statistics st = parentStats.clone();
if (satisfyPrecondition(parentStats)) {
@@ -260,7 +261,7 @@ public class StatsRulesProcFactory {
// result in number of rows getting more than the input rows in
// which case stats need not be updated
if (newNumRows <= parentStats.getNumRows()) {
- updateStats(st, newNumRows, true);
+ updateStats(st, newNumRows, true, fop);
}
if (LOG.isDebugEnabled()) {
@@ -270,7 +271,7 @@ public class StatsRulesProcFactory {
// update only the basic statistics in the absence of column statistics
if (newNumRows <= parentStats.getNumRows()) {
- updateStats(st, newNumRows, false);
+ updateStats(st, newNumRows, false, fop);
}
if (LOG.isDebugEnabled()) {
@@ -287,7 +288,8 @@ public class StatsRulesProcFactory {
}
private long evaluateExpression(Statistics stats, ExprNodeDesc pred,
- AnnotateStatsProcCtx aspCtx, List<String> neededCols) throws CloneNotSupportedException {
+ AnnotateStatsProcCtx aspCtx, List<String> neededCols,
+ FilterOperator fop) throws CloneNotSupportedException {
long newNumRows = 0;
Statistics andStats = null;
@@ -302,24 +304,26 @@ public class StatsRulesProcFactory {
// evaluate children
for (ExprNodeDesc child : genFunc.getChildren()) {
- newNumRows = evaluateChildExpr(aspCtx.getAndExprStats(), child, aspCtx, neededCols);
+ newNumRows = evaluateChildExpr(aspCtx.getAndExprStats(), child,
+ aspCtx, neededCols, fop);
if (satisfyPrecondition(aspCtx.getAndExprStats())) {
- updateStats(aspCtx.getAndExprStats(), newNumRows, true);
+ updateStats(aspCtx.getAndExprStats(), newNumRows, true, fop);
} else {
- updateStats(aspCtx.getAndExprStats(), newNumRows, false);
+ updateStats(aspCtx.getAndExprStats(), newNumRows, false, fop);
}
}
} else if (udf instanceof GenericUDFOPOr) {
// for OR condition independently compute and update stats
for (ExprNodeDesc child : genFunc.getChildren()) {
- newNumRows += evaluateChildExpr(stats, child, aspCtx, neededCols);
+ newNumRows += evaluateChildExpr(stats, child, aspCtx, neededCols,
+ fop);
}
} else if (udf instanceof GenericUDFOPNot) {
- newNumRows = evaluateNotExpr(stats, pred, aspCtx, neededCols);
+ newNumRows = evaluateNotExpr(stats, pred, aspCtx, neededCols, fop);
} else {
// single predicate condition
- newNumRows = evaluateChildExpr(stats, pred, aspCtx, neededCols);
+ newNumRows = evaluateChildExpr(stats, pred, aspCtx, neededCols, fop);
}
} else if (pred instanceof ExprNodeColumnDesc) {
@@ -351,8 +355,9 @@ public class StatsRulesProcFactory {
return newNumRows;
}
- private long evaluateNotExpr(Statistics stats, ExprNodeDesc pred, AnnotateStatsProcCtx aspCtx,
- List<String> neededCols) throws CloneNotSupportedException {
+ private long evaluateNotExpr(Statistics stats, ExprNodeDesc pred,
+ AnnotateStatsProcCtx aspCtx, List<String> neededCols, FilterOperator fop)
+ throws CloneNotSupportedException {
long numRows = stats.getNumRows();
@@ -364,8 +369,9 @@ public class StatsRulesProcFactory {
// GenericUDF
long newNumRows = 0;
- for (ExprNodeDesc child : ((ExprNodeGenericFuncDesc) pred).getChildren()) {
- newNumRows = evaluateChildExpr(stats, child, aspCtx, neededCols);
+ for (ExprNodeDesc child : genFunc.getChildren()) {
+ newNumRows = evaluateChildExpr(stats, child, aspCtx, neededCols,
+ fop);
}
return numRows - newNumRows;
} else if (leaf instanceof ExprNodeConstantDesc) {
@@ -398,8 +404,7 @@ public class StatsRulesProcFactory {
return numRows / 2;
}
- private long evaluateColEqualsNullExpr(Statistics stats, ExprNodeDesc pred,
- AnnotateStatsProcCtx aspCtx) {
+ private long evaluateColEqualsNullExpr(Statistics stats, ExprNodeDesc pred) {
long numRows = stats.getNumRows();
@@ -425,7 +430,8 @@ public class StatsRulesProcFactory {
}
private long evaluateChildExpr(Statistics stats, ExprNodeDesc child,
- AnnotateStatsProcCtx aspCtx, List<String> neededCols) throws CloneNotSupportedException {
+ AnnotateStatsProcCtx aspCtx, List<String> neededCols,
+ FilterOperator fop) throws CloneNotSupportedException {
long numRows = stats.getNumRows();
@@ -434,7 +440,8 @@ public class StatsRulesProcFactory {
ExprNodeGenericFuncDesc genFunc = (ExprNodeGenericFuncDesc) child;
GenericUDF udf = genFunc.getGenericUDF();
- if (udf instanceof GenericUDFOPEqual || udf instanceof GenericUDFOPEqualNS) {
+ if (udf instanceof GenericUDFOPEqual ||
+ udf instanceof GenericUDFOPEqualNS) {
String colName = null;
String tabAlias = null;
boolean isConst = false;
@@ -506,13 +513,13 @@ public class StatsRulesProcFactory {
|| udf instanceof GenericUDFOPLessThan) {
return numRows / 3;
} else if (udf instanceof GenericUDFOPNotNull) {
- long newNumRows = evaluateColEqualsNullExpr(stats, genFunc, aspCtx);
+ long newNumRows = evaluateColEqualsNullExpr(stats, genFunc);
return stats.getNumRows() - newNumRows;
} else if (udf instanceof GenericUDFOPNull) {
- return evaluateColEqualsNullExpr(stats, genFunc, aspCtx);
+ return evaluateColEqualsNullExpr(stats, genFunc);
} else if (udf instanceof GenericUDFOPAnd || udf instanceof GenericUDFOPOr
|| udf instanceof GenericUDFOPNot) {
- return evaluateExpression(stats, genFunc, aspCtx, neededCols);
+ return evaluateExpression(stats, genFunc, aspCtx, neededCols, fop);
}
}
@@ -580,6 +587,23 @@ public class StatsRulesProcFactory {
Map<String, ExprNodeDesc> colExprMap = gop.getColumnExprMap();
RowSchema rs = gop.getSchema();
Statistics stats = null;
+ boolean mapSide = false;
+ int multiplier = mapSideParallelism;
+ long newNumRows;
+ long newDataSize;
+
+ // map side
+ if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator ||
+ gop.getChildOperators().get(0) instanceof AppMasterEventOperator) {
+
+ mapSide = true;
+
+ // map-side grouping set present. if grouping set is present then
+ // multiply the number of rows by number of elements in grouping set
+ if (gop.getConf().isGroupingSetsPresent()) {
+ multiplier *= gop.getConf().getListGroupingSets().size();
+ }
+ }
try {
if (satisfyPrecondition(parentStats)) {
@@ -589,7 +613,6 @@ public class StatsRulesProcFactory {
StatsUtils.getColStatisticsFromExprMap(conf, parentStats, colExprMap, rs);
stats.setColumnStats(colStats);
long dvProd = 1;
- long newNumRows = 0;
// compute product of distinct values of grouping columns
for (ColStatistics cs : colStats) {
@@ -617,7 +640,7 @@ public class StatsRulesProcFactory {
}
// map side
- if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator) {
+ if (mapSide) {
// since we do not know if hash-aggregation will be enabled or disabled
// at runtime we will assume that map-side group by does not do any
@@ -626,14 +649,10 @@ public class StatsRulesProcFactory {
// map-side grouping set present. if grouping set is present then
// multiply the number of rows by number of elements in grouping set
if (gop.getConf().isGroupingSetsPresent()) {
- int multiplier = gop.getConf().getListGroupingSets().size();
-
- // take into account the map-side parallelism as well, default is 1
- multiplier *= mapSideParallelism;
- newNumRows = multiplier * stats.getNumRows();
- long dataSize = multiplier * stats.getDataSize();
+ newNumRows = setMaxIfInvalid(multiplier * stats.getNumRows());
+ newDataSize = setMaxIfInvalid(multiplier * stats.getDataSize());
stats.setNumRows(newNumRows);
- stats.setDataSize(dataSize);
+ stats.setDataSize(newDataSize);
for (ColStatistics cs : colStats) {
if (cs != null) {
long oldNumNulls = cs.getNumNulls();
@@ -644,29 +663,33 @@ public class StatsRulesProcFactory {
} else {
// map side no grouping set
- newNumRows = stats.getNumRows() * mapSideParallelism;
- updateStats(stats, newNumRows, true);
+ newNumRows = stats.getNumRows() * multiplier;
+ updateStats(stats, newNumRows, true, gop);
}
} else {
// reduce side
newNumRows = applyGBYRule(stats.getNumRows(), dvProd);
- updateStats(stats, newNumRows, true);
+ updateStats(stats, newNumRows, true, gop);
}
} else {
if (parentStats != null) {
+ stats = parentStats.clone();
+
// worst case, in the absence of column statistics assume half the rows are emitted
- if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator) {
+ if (mapSide) {
// map side
- stats = parentStats.clone();
+ newNumRows = multiplier * stats.getNumRows();
+ newDataSize = multiplier * stats.getDataSize();
+ stats.setNumRows(newNumRows);
+ stats.setDataSize(newDataSize);
} else {
// reduce side
- stats = parentStats.clone();
- long newNumRows = parentStats.getNumRows() / 2;
- updateStats(stats, newNumRows, false);
+ newNumRows = parentStats.getNumRows() / 2;
+ updateStats(stats, newNumRows, false, gop);
}
}
}
@@ -700,7 +723,7 @@ public class StatsRulesProcFactory {
// only if the column stats is available, update the data size from
// the column stats
if (!stats.getColumnStatsState().equals(Statistics.State.NONE)) {
- updateStats(stats, stats.getNumRows(), true);
+ updateStats(stats, stats.getNumRows(), true, gop);
}
}
@@ -709,7 +732,7 @@ public class StatsRulesProcFactory {
// rows will be 1
if (colExprMap.isEmpty()) {
stats.setNumRows(1);
- updateStats(stats, 1, true);
+ updateStats(stats, 1, true, gop);
}
}
@@ -817,6 +840,7 @@ public class StatsRulesProcFactory {
Map<String, ColStatistics> joinedColStats = Maps.newHashMap();
Map<Integer, List<String>> joinKeys = Maps.newHashMap();
+ List<Long> rowCounts = Lists.newArrayList();
// get the join keys from parent ReduceSink operators
for (int pos = 0; pos < parents.size(); pos++) {
@@ -836,6 +860,7 @@ public class StatsRulesProcFactory {
for (String tabAlias : tableAliases) {
rowCountParents.put(tabAlias, parentStats.getNumRows());
}
+ rowCounts.add(parentStats.getNumRows());
// multi-attribute join key
if (keyExprs.size() > 1) {
@@ -936,22 +961,14 @@ public class StatsRulesProcFactory {
// update join statistics
stats.setColumnStats(outColStats);
- long newRowCount = computeNewRowCount(
- Lists.newArrayList(rowCountParents.values()), denom);
-
- if (newRowCount <= 0 && LOG.isDebugEnabled()) {
- newRowCount = 0;
- LOG.debug("[0] STATS-" + jop.toString() + ": Product of #rows might be greater than"
- + " denominator or overflow might have occurred. Resetting row count to 0."
- + " #Rows of parents: " + rowCountParents.toString() + ". Denominator: " + denom);
- }
+ long newRowCount = computeNewRowCount(rowCounts, denom);
- updateStatsForJoinType(stats, newRowCount, jop.getConf(),
- rowCountParents, outInTabAlias);
+ updateStatsForJoinType(stats, newRowCount, jop, rowCountParents,
+ outInTabAlias);
jop.setStatistics(stats);
if (LOG.isDebugEnabled()) {
- LOG.debug("[1] STATS-" + jop.toString() + ": " + stats.extendedToString());
+ LOG.debug("[0] STATS-" + jop.toString() + ": " + stats.extendedToString());
}
} else {
@@ -979,14 +996,13 @@ public class StatsRulesProcFactory {
long maxDataSize = parentSizes.get(maxRowIdx);
long newNumRows = (long) (joinFactor * maxRowCount * (numParents - 1));
long newDataSize = (long) (joinFactor * maxDataSize * (numParents - 1));
-
Statistics wcStats = new Statistics();
- wcStats.setNumRows(newNumRows);
- wcStats.setDataSize(newDataSize);
+ wcStats.setNumRows(setMaxIfInvalid(newNumRows));
+ wcStats.setDataSize(setMaxIfInvalid(newDataSize));
jop.setStatistics(wcStats);
if (LOG.isDebugEnabled()) {
- LOG.debug("[2] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
+ LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
}
}
}
@@ -1008,8 +1024,15 @@ public class StatsRulesProcFactory {
}
private void updateStatsForJoinType(Statistics stats, long newNumRows,
- JoinDesc conf, Map<String, Long> rowCountParents,
+ CommonJoinOperator<? extends JoinDesc> jop,
+ Map<String, Long> rowCountParents,
Map<String, String> outInTabAlias) {
+
+ if (newNumRows <= 0) {
+ LOG.info("STATS-" + jop.toString() + ": Overflow in number of rows."
+ + newNumRows + " rows will be set to Long.MAX_VALUE");
+ }
+ newNumRows = setMaxIfInvalid(newNumRows);
stats.setNumRows(newNumRows);
// scale down/up the column statistics based on the changes in number of
@@ -1040,7 +1063,7 @@ public class StatsRulesProcFactory {
stats.setColumnStats(colStats);
long newDataSize = StatsUtils
.getDataSizeFromColumnStats(newNumRows, colStats);
- stats.setDataSize(newDataSize);
+ stats.setDataSize(setMaxIfInvalid(newDataSize));
}
private long computeNewRowCount(List<Long> rowCountParents, long denom) {
@@ -1168,7 +1191,7 @@ public class StatsRulesProcFactory {
// if limit is greater than available rows then do not update
// statistics
if (limit <= parentStats.getNumRows()) {
- updateStats(stats, limit, true);
+ updateStats(stats, limit, true, lop);
}
lop.setStatistics(stats);
@@ -1185,8 +1208,8 @@ public class StatsRulesProcFactory {
long numRows = limit;
long avgRowSize = parentStats.getAvgRowSize();
long dataSize = avgRowSize * limit;
- wcStats.setNumRows(numRows);
- wcStats.setDataSize(dataSize);
+ wcStats.setNumRows(setMaxIfInvalid(numRows));
+ wcStats.setDataSize(setMaxIfInvalid(dataSize));
}
lop.setStatistics(wcStats);
@@ -1364,7 +1387,15 @@ public class StatsRulesProcFactory {
* @param useColStats
* - use column statistics to compute data size
*/
- static void updateStats(Statistics stats, long newNumRows, boolean useColStats) {
+ static void updateStats(Statistics stats, long newNumRows,
+ boolean useColStats, Operator<? extends OperatorDesc> op) {
+
+ if (newNumRows <= 0) {
+ LOG.info("STATS-" + op.toString() + ": Overflow in number of rows."
+ + newNumRows + " rows will be set to Long.MAX_VALUE");
+ }
+
+ newNumRows = setMaxIfInvalid(newNumRows);
long oldRowCount = stats.getNumRows();
double ratio = (double) newNumRows / (double) oldRowCount;
stats.setNumRows(newNumRows);
@@ -1389,10 +1420,10 @@ public class StatsRulesProcFactory {
}
stats.setColumnStats(colStats);
long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats);
- stats.setDataSize(newDataSize);
+ stats.setDataSize(setMaxIfInvalid(newDataSize));
} else {
long newDataSize = (long) (ratio * stats.getDataSize());
- stats.setDataSize(newDataSize);
+ stats.setDataSize(setMaxIfInvalid(newDataSize));
}
}
@@ -1401,4 +1432,13 @@ public class StatsRulesProcFactory {
&& !stats.getColumnStatsState().equals(Statistics.State.NONE);
}
+ /**
+ * negative number of rows or data sizes are invalid. It could be because of
+ * long overflow in which case return Long.MAX_VALUE
+ * @param val - input value
+ * @return Long.MAX_VALUE if val is negative else val
+ */
+ static long setMaxIfInvalid(long val) {
+ return val < 0 ? Long.MAX_VALUE : val;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Mon Sep 8 04:38:17 2014
@@ -289,7 +289,8 @@ public abstract class BaseSemanticAnalyz
}
public static String getUnescapedName(ASTNode tableOrColumnNode, String currentDatabase) {
- if (tableOrColumnNode.getToken().getType() == HiveParser.TOK_TABNAME) {
+ int tokenType = tableOrColumnNode.getToken().getType();
+ if (tokenType == HiveParser.TOK_TABNAME) {
// table node
if (tableOrColumnNode.getChildCount() == 2) {
String dbName = unescapeIdentifier(tableOrColumnNode.getChild(0).getText());
@@ -301,6 +302,8 @@ public abstract class BaseSemanticAnalyz
return currentDatabase + "." + tableName;
}
return tableName;
+ } else if (tokenType == HiveParser.StringLiteral) {
+ return unescapeSQLString(tableOrColumnNode.getText());
}
// column node
return unescapeIdentifier(tableOrColumnNode.getText());
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java Mon Sep 8 04:38:17 2014
@@ -22,16 +22,14 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
/**
- * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks
+ * FileSinkProcessor is a simple rule to remember seen file sinks for later
+ * processing.
*
*/
public class FileSinkProcessor implements NodeProcessor {
@@ -39,12 +37,6 @@ public class FileSinkProcessor implement
static final private Log LOG = LogFactory.getLog(FileSinkProcessor.class.getName());
@Override
- /*
- * (non-Javadoc)
- * we should ideally not modify the tree we traverse.
- * However, since we need to walk the tree at any time when we modify the
- * operator, we might as well do it here.
- */
public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procCtx, Object... nodeOutputs)
throws SemanticException {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Mon Sep 8 04:38:17 2014
@@ -26,29 +26,28 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
/**
@@ -134,6 +133,15 @@ public class GenTezProcContext implement
// remember which reducesinks we've already connected
public final Set<ReduceSinkOperator> connectedReduceSinks;
+ // remember the event operators we've seen
+ public final Set<AppMasterEventOperator> eventOperatorSet;
+
+ // remember the event operators we've abandoned.
+ public final Set<AppMasterEventOperator> abandonedEventOperatorSet;
+
+ // remember the connections between ts and event
+ public final Map<TableScanOperator, List<AppMasterEventOperator>> tsToEventMap;
+
@SuppressWarnings("unchecked")
public GenTezProcContext(HiveConf conf, ParseContext parseContext,
List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -165,6 +173,9 @@ public class GenTezProcContext implement
this.linkedFileSinks = new LinkedHashMap<Path, List<FileSinkDesc>>();
this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
this.connectedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
+ this.eventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
+ this.abandonedEventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
+ this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>();
rootTasks.add(currentTask);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Mon Sep 8 04:38:17 2014
@@ -20,38 +20,43 @@ package org.apache.hadoop.hive.ql.parse;
import java.util.ArrayList;
import java.util.Deque;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.LinkedList;
-import java.util.Map;
+import java.util.List;
import java.util.Set;
-import org.apache.hadoop.fs.Path;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.FetchTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+
/**
* GenTezUtils is a collection of shared helper methods to produce
* TezWork
@@ -119,12 +124,12 @@ public class GenTezUtils {
int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
// min we allow tez to pick
- int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers()
+ int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers()
* minPartitionFactor));
minPartition = (minPartition > maxReducers) ? maxReducers : minPartition;
// max we allow tez to pick
- int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor);
+ int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor);
maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition;
reduceWork.setMinReduceTasks(minPartition);
@@ -177,10 +182,19 @@ public class GenTezUtils {
// map work starts with table scan operators
assert root instanceof TableScanOperator;
- String alias = ((TableScanOperator)root).getConf().getAlias();
+ TableScanOperator ts = (TableScanOperator) root;
+
+ String alias = ts.getConf().getAlias();
setupMapWork(mapWork, context, partitions, root, alias);
+ if (context.parseContext != null
+ && context.parseContext.getTopToTable() != null
+ && context.parseContext.getTopToTable().containsKey(ts)
+ && context.parseContext.getTopToTable().get(ts).isDummyTable()) {
+ mapWork.setDummyTableScan(true);
+ }
+
// add new item to the tez work
tezWork.add(mapWork);
@@ -201,18 +215,20 @@ public class GenTezUtils {
BaseWork work)
throws SemanticException {
- Set<Operator<?>> roots = work.getAllRootOperators();
+ List<Operator<?>> roots = new ArrayList<Operator<?>>();
+ roots.addAll(work.getAllRootOperators());
if (work.getDummyOps() != null) {
roots.addAll(work.getDummyOps());
}
+ roots.addAll(context.eventOperatorSet);
// need to clone the plan.
- Set<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
+ List<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
// we're cloning the operator plan but we're retaining the original work. That means
// that root operators have to be replaced with the cloned ops. The replacement map
// tells you what that mapping is.
- Map<Operator<?>, Operator<?>> replacementMap = new HashMap<Operator<?>, Operator<?>>();
+ BiMap<Operator<?>, Operator<?>> replacementMap = HashBiMap.create();
// there's some special handling for dummyOps required. Mapjoins won't be properly
// initialized if their dummy parents aren't initialized. Since we cloned the plan
@@ -222,11 +238,35 @@ public class GenTezUtils {
Iterator<Operator<?>> it = newRoots.iterator();
for (Operator<?> orig: roots) {
Operator<?> newRoot = it.next();
+
+ replacementMap.put(orig, newRoot);
+
if (newRoot instanceof HashTableDummyOperator) {
- dummyOps.add((HashTableDummyOperator)newRoot);
+ // dummy ops need to be updated to the cloned ones.
+ dummyOps.add((HashTableDummyOperator) newRoot);
+ it.remove();
+ } else if (newRoot instanceof AppMasterEventOperator) {
+ // event operators point to table scan operators. When cloning these we
+ // need to restore the original scan.
+ if (newRoot.getConf() instanceof DynamicPruningEventDesc) {
+ TableScanOperator ts = ((DynamicPruningEventDesc) orig.getConf()).getTableScan();
+ if (ts == null) {
+ throw new AssertionError("No table scan associated with dynamic event pruning. " + orig);
+ }
+ ((DynamicPruningEventDesc) newRoot.getConf()).setTableScan(ts);
+ }
it.remove();
} else {
- replacementMap.put(orig,newRoot);
+ if (newRoot instanceof TableScanOperator) {
+ if (context.tsToEventMap.containsKey(orig)) {
+ // we need to update event operators with the cloned table scan
+ for (AppMasterEventOperator event : context.tsToEventMap.get(orig)) {
+ ((DynamicPruningEventDesc) event.getConf()).setTableScan((TableScanOperator) newRoot);
+ }
+ }
+ }
+ context.rootToWorkMap.remove(orig);
+ context.rootToWorkMap.put(newRoot, work);
}
}
@@ -263,6 +303,15 @@ public class GenTezUtils {
desc.setLinkedFileSinkDesc(linked);
}
+ if (current instanceof AppMasterEventOperator) {
+ // remember for additional processing later
+ context.eventOperatorSet.add((AppMasterEventOperator) current);
+
+ // mark the original as abandoned. Don't need it anymore.
+ context.abandonedEventOperatorSet.add((AppMasterEventOperator) replacementMap.inverse()
+ .get(current));
+ }
+
if (current instanceof UnionOperator) {
Operator<?> parent = null;
int count = 0;
@@ -328,4 +377,87 @@ public class GenTezUtils {
}
}
}
+
+ /**
+ * processAppMasterEvent sets up the event descriptor and the MapWork.
+ *
+ * @param procCtx
+ * @param event
+ */
+ public void processAppMasterEvent(GenTezProcContext procCtx, AppMasterEventOperator event) {
+
+ if (procCtx.abandonedEventOperatorSet.contains(event)) {
+ // don't need this anymore
+ return;
+ }
+
+ DynamicPruningEventDesc eventDesc = (DynamicPruningEventDesc)event.getConf();
+ TableScanOperator ts = eventDesc.getTableScan();
+
+ MapWork work = (MapWork) procCtx.rootToWorkMap.get(ts);
+ if (work == null) {
+ throw new AssertionError("No work found for tablescan " + ts);
+ }
+
+ BaseWork enclosingWork = getEnclosingWork(event, procCtx);
+ if (enclosingWork == null) {
+ throw new AssertionError("Cannot find work for operator" + event);
+ }
+ String sourceName = enclosingWork.getName();
+
+ // store the vertex name in the operator pipeline
+ eventDesc.setVertexName(work.getName());
+ eventDesc.setInputName(work.getAliases().get(0));
+
+ // store table descriptor in map-work
+ if (!work.getEventSourceTableDescMap().containsKey(sourceName)) {
+ work.getEventSourceTableDescMap().put(sourceName, new LinkedList<TableDesc>());
+ }
+ List<TableDesc> tables = work.getEventSourceTableDescMap().get(sourceName);
+ tables.add(event.getConf().getTable());
+
+ // store column name in map-work
+ if (!work.getEventSourceColumnNameMap().containsKey(sourceName)) {
+ work.getEventSourceColumnNameMap().put(sourceName, new LinkedList<String>());
+ }
+ List<String> columns = work.getEventSourceColumnNameMap().get(sourceName);
+ columns.add(eventDesc.getTargetColumnName());
+
+ // store partition key expr in map-work
+ if (!work.getEventSourcePartKeyExprMap().containsKey(sourceName)) {
+ work.getEventSourcePartKeyExprMap().put(sourceName, new LinkedList<ExprNodeDesc>());
+ }
+ List<ExprNodeDesc> keys = work.getEventSourcePartKeyExprMap().get(sourceName);
+ keys.add(eventDesc.getPartKey());
+
+ }
+
+ /**
+ * getEncosingWork finds the BaseWork any given operator belongs to.
+ */
+ public BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) {
+ List<Operator<?>> ops = new ArrayList<Operator<?>>();
+ findRoots(op, ops);
+ for (Operator<?> r : ops) {
+ BaseWork work = procCtx.rootToWorkMap.get(r);
+ if (work != null) {
+ return work;
+ }
+ }
+ return null;
+ }
+
+ /*
+ * findRoots returns all root operators (in ops) that result in operator op
+ */
+ private void findRoots(Operator<?> op, List<Operator<?>> ops) {
+ List<Operator<?>> parents = op.getParentOperators();
+ if (parents == null || parents.isEmpty()) {
+ ops.add(op);
+ return;
+ }
+ for (Operator<?> p : parents) {
+ findRoots(p, ops);
+ }
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java Mon Sep 8 04:38:17 2014
@@ -445,7 +445,7 @@ public class ImportSemanticAnalyzer exte
* substitute OutputFormat name based on HiveFileFormatUtils.outputFormatSubstituteMap
*/
try {
- Class<?> origin = Class.forName(importedofc, true, JavaUtils.getClassLoader());
+ Class<?> origin = Class.forName(importedofc, true, Utilities.getSessionSpecifiedClassLoader());
Class<? extends HiveOutputFormat> replaced = HiveFileFormatUtils
.getOutputFormatSubstitute(origin,false);
if (replaced == null) {