You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/08 06:38:26 UTC

svn commit: r1623263 [21/28] - in /hive/branches/spark: ./ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/ ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/ beeline/src/test/org/apache/hive/beeline/ bin/...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Mon Sep  8 04:38:17 2014
@@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileInputFormat;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Mon Sep  8 04:38:17 2014
@@ -96,9 +96,6 @@ class WriterImpl implements Writer, Memo
   private static final int HDFS_BUFFER_SIZE = 256 * 1024;
   private static final int MIN_ROW_INDEX_STRIDE = 1000;
 
-  // HDFS requires blocks < 2GB and multiples of 512, so pick 1.5GB
-  private static final long MAX_BLOCK_SIZE = 1536 * 1024 * 1024;
-
   // threshold above which buffer size will be automatically resized
   private static final int COLUMN_COUNT_THRESHOLD = 1000;
 
@@ -135,8 +132,6 @@ class WriterImpl implements Writer, Memo
     new TreeMap<String, ByteString>();
   private final StreamFactory streamFactory = new StreamFactory();
   private final TreeWriter treeWriter;
-  private final OrcProto.RowIndex.Builder rowIndex =
-      OrcProto.RowIndex.newBuilder();
   private final boolean buildIndex;
   private final MemoryManager memoryManager;
   private final OrcFile.Version version;
@@ -678,7 +673,7 @@ class WriterImpl implements Writer, Memo
       if (rowIndexStream != null) {
         if (rowIndex.getEntryCount() != requiredIndexEntries) {
           throw new IllegalArgumentException("Column has wrong number of " +
-               "index entries found: " + rowIndexEntry + " expected: " +
+               "index entries found: " + rowIndex.getEntryCount() + " expected: " +
                requiredIndexEntries);
         }
         rowIndex.build().writeTo(rowIndexStream);
@@ -1005,6 +1000,8 @@ class WriterImpl implements Writer, Memo
     private final float dictionaryKeySizeThreshold;
     private boolean useDictionaryEncoding = true;
     private boolean isDirectV2 = true;
+    private boolean doneDictionaryCheck;
+    private final boolean strideDictionaryCheck;
 
     StringTreeWriter(int columnId,
                      ObjectInspector inspector,
@@ -1025,9 +1022,14 @@ class WriterImpl implements Writer, Memo
       directLengthOutput = createIntegerWriter(writer.createStream(id,
           OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
       dictionaryKeySizeThreshold = writer.getConfiguration().getFloat(
-        HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname,
-        HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.
-          defaultFloatVal);
+          HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname,
+          HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.
+              defaultFloatVal);
+      strideDictionaryCheck = writer.getConfiguration().getBoolean(
+          HiveConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK.varname,
+          HiveConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK.
+            defaultBoolVal);
+      doneDictionaryCheck = false;
     }
 
     /**
@@ -1045,21 +1047,71 @@ class WriterImpl implements Writer, Memo
       super.write(obj);
       if (obj != null) {
         Text val = getTextValue(obj);
-        rows.add(dictionary.add(val));
+        if (useDictionaryEncoding || !strideDictionaryCheck) {
+          rows.add(dictionary.add(val));
+        } else {
+          // write data and length
+          directStreamOutput.write(val.getBytes(), 0, val.getLength());
+          directLengthOutput.write(val.getLength());
+        }
         indexStatistics.updateString(val);
       }
     }
 
+    private boolean checkDictionaryEncoding() {
+      if (!doneDictionaryCheck) {
+        // Set the flag indicating whether or not to use dictionary encoding
+        // based on whether or not the fraction of distinct keys over number of
+        // non-null rows is less than the configured threshold
+        float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
+        useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
+        doneDictionaryCheck = true;
+      }
+      return useDictionaryEncoding;
+    }
+
     @Override
     void writeStripe(OrcProto.StripeFooter.Builder builder,
                      int requiredIndexEntries) throws IOException {
-      // Set the flag indicating whether or not to use dictionary encoding
-      // based on whether or not the fraction of distinct keys over number of
-      // non-null rows is less than the configured threshold
-      useDictionaryEncoding =
-        (!isDirectV2) || (rows.size() > 0 &&
-                          (float)(dictionary.size()) / rows.size() <=
-                            dictionaryKeySizeThreshold);
+      // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
+      // checking would not have happened. So do it again here.
+      checkDictionaryEncoding();
+
+      if (useDictionaryEncoding) {
+        flushDictionary();
+      } else {
+        // flushout any left over entries from dictionary
+        if (rows.size() > 0) {
+          flushDictionary();
+        }
+
+        // suppress the stream for every stripe if dictionary is disabled
+        stringOutput.suppress();
+      }
+
+      // we need to build the rowindex before calling super, since it
+      // writes it out.
+      super.writeStripe(builder, requiredIndexEntries);
+      stringOutput.flush();
+      lengthOutput.flush();
+      rowOutput.flush();
+      directStreamOutput.flush();
+      directLengthOutput.flush();
+      // reset all of the fields to be ready for the next stripe.
+      dictionary.clear();
+      savedRowIndex.clear();
+      rowIndexValueCount.clear();
+      recordPosition(rowIndexPosition);
+      rowIndexValueCount.add(0L);
+
+      if (!useDictionaryEncoding) {
+        // record the start positions of first index stride of next stripe i.e
+        // beginning of the direct streams when dictionary is disabled
+        recordDirectStreamPosition();
+      }
+    }
+
+    private void flushDictionary() throws IOException {
       final int[] dumpOrder = new int[dictionary.size()];
 
       if (useDictionaryEncoding) {
@@ -1113,21 +1165,7 @@ class WriterImpl implements Writer, Memo
           }
         }
       }
-      // we need to build the rowindex before calling super, since it
-      // writes it out.
-      super.writeStripe(builder, requiredIndexEntries);
-      stringOutput.flush();
-      lengthOutput.flush();
-      rowOutput.flush();
-      directStreamOutput.flush();
-      directLengthOutput.flush();
-      // reset all of the fields to be ready for the next stripe.
-      dictionary.clear();
       rows.clear();
-      savedRowIndex.clear();
-      rowIndexValueCount.clear();
-      recordPosition(rowIndexPosition);
-      rowIndexValueCount.add(0L);
     }
 
     @Override
@@ -1165,10 +1203,30 @@ class WriterImpl implements Writer, Memo
       OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
       rowIndexEntry.setStatistics(indexStatistics.serialize());
       indexStatistics.reset();
-      savedRowIndex.add(rowIndexEntry.build());
+      OrcProto.RowIndexEntry base = rowIndexEntry.build();
+      savedRowIndex.add(base);
       rowIndexEntry.clear();
       recordPosition(rowIndexPosition);
       rowIndexValueCount.add(Long.valueOf(rows.size()));
+      if (strideDictionaryCheck) {
+        checkDictionaryEncoding();
+      }
+      if (!useDictionaryEncoding) {
+        if (rows.size() > 0) {
+          flushDictionary();
+          // just record the start positions of next index stride
+          recordDirectStreamPosition();
+        } else {
+          // record the start positions of next index stride
+          recordDirectStreamPosition();
+          getRowIndex().addEntry(base);
+        }
+      }
+    }
+
+    private void recordDirectStreamPosition() throws IOException {
+      directStreamOutput.getPosition(rowIndexPosition);
+      directLengthOutput.getPosition(rowIndexPosition);
     }
 
     @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java Mon Sep  8 04:38:17 2014
@@ -95,7 +95,7 @@ public class HiveSchemaConverter {
         int scale = decimalTypeInfo.scale();
         int bytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1];
         return Types.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(bytes).as(OriginalType.DECIMAL).
-        		scale(scale).precision(prec).named(name);
+            scale(scale).precision(prec).named(name);
       } else if (typeInfo.equals(TypeInfoFactory.unknownTypeInfo)) {
         throw new UnsupportedOperationException("Unknown type not implemented");
       } else {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java Mon Sep  8 04:38:17 2014
@@ -140,7 +140,7 @@ public class DataWritableReadSupport ext
               throw new IllegalStateException(msg);
             }
           }
-	}
+        }
       }
       requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(),
               typeListWanted), fileSchema, configuration);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java Mon Sep  8 04:38:17 2014
@@ -27,6 +27,8 @@ import org.apache.commons.lang.StringUti
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java Mon Sep  8 04:38:17 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.lockmgr
 import org.apache.hadoop.hive.ql.metadata.*;
 
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
@@ -38,7 +39,7 @@ public class EmbeddedLockManager impleme
 
   private HiveLockManagerCtx ctx;
 
-  private int sleepTime = 1000;
+  private long sleepTime = 1000;
   private int numRetriesForLock = 0;
   private int numRetriesForUnLock = 0;
 
@@ -82,12 +83,13 @@ public class EmbeddedLockManager impleme
 
   public void refresh() {
     HiveConf conf = ctx.getConf();
-    sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000;
+    sleepTime = conf.getTimeVar(
+        HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
     numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES);
     numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
   }
 
-  public HiveLock lock(HiveLockObject key, HiveLockMode mode, int numRetriesForLock, int sleepTime)
+  public HiveLock lock(HiveLockObject key, HiveLockMode mode, int numRetriesForLock, long sleepTime)
       throws LockException {
     for (int i = 0; i <= numRetriesForLock; i++) {
       if (i > 0) {
@@ -101,7 +103,7 @@ public class EmbeddedLockManager impleme
     return null;
   }
 
-  private void sleep(int sleepTime) {
+  private void sleep(long sleepTime) {
     try {
       Thread.sleep(sleepTime);
     } catch (InterruptedException e) {
@@ -109,7 +111,7 @@ public class EmbeddedLockManager impleme
     }
   }
 
-  public List<HiveLock> lock(List<HiveLockObj> objs, int numRetriesForLock, int sleepTime)
+  public List<HiveLock> lock(List<HiveLockObj> objs, int numRetriesForLock, long sleepTime)
       throws LockException {
     sortLocks(objs);
     for (int i = 0; i <= numRetriesForLock; i++) {
@@ -132,7 +134,7 @@ public class EmbeddedLockManager impleme
   }
 
   private List<HiveLock> lockPrimitive(List<HiveLockObj> objs, int numRetriesForLock,
-      int sleepTime) throws LockException {
+      long sleepTime) throws LockException {
     List<HiveLock> locks = new ArrayList<HiveLock>();
     for (HiveLockObj obj : objs) {
       HiveLock lock = lockPrimitive(obj.getObj(), obj.getMode());
@@ -164,7 +166,7 @@ public class EmbeddedLockManager impleme
     });
   }
 
-  public void unlock(HiveLock hiveLock, int numRetriesForUnLock, int sleepTime)
+  public void unlock(HiveLock hiveLock, int numRetriesForUnLock, long sleepTime)
       throws LockException {
     String[] paths = hiveLock.getHiveLockObject().getPaths();
     HiveLockObjectData data = hiveLock.getHiveLockObject().getData();
@@ -179,7 +181,7 @@ public class EmbeddedLockManager impleme
     throw new LockException("Failed to release lock " + hiveLock);
   }
 
-  public void releaseLocks(List<HiveLock> hiveLocks, int numRetriesForUnLock, int sleepTime) {
+  public void releaseLocks(List<HiveLock> hiveLocks, int numRetriesForUnLock, long sleepTime) {
     for (HiveLock locked : hiveLocks) {
       try {
         unlock(locked, numRetriesForUnLock, sleepTime);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java Mon Sep  8 04:38:17 2014
@@ -37,6 +37,7 @@ import org.apache.zookeeper.ZooKeeper;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -53,7 +54,7 @@ public class ZooKeeperHiveLockManager im
   private int sessionTimeout;
   private String quorumServers;
 
-  private int sleepTime;
+  private long sleepTime;
   private int numRetriesForLock;
   private int numRetriesForUnLock;
 
@@ -106,7 +107,8 @@ public class ZooKeeperHiveLockManager im
     sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
     quorumServers = ZooKeeperHiveLockManager.getQuorumServers(conf);
 
-    sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000;
+    sleepTime = conf.getTimeVar(
+        HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
     numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES);
     numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
 
@@ -132,7 +134,8 @@ public class ZooKeeperHiveLockManager im
   @Override
   public void refresh() {
     HiveConf conf = ctx.getConf();
-    sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000;
+    sleepTime = conf.getTimeVar(
+        HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
     numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES);
     numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
   }
@@ -268,7 +271,7 @@ public class ZooKeeperHiveLockManager im
    * @param mode
    *          The mode of the lock
    * @param keepAlive
-   *          Whether the lock is to be persisted after the statement Acuire the
+   *          Whether the lock is to be persisted after the statement Acquire the
    *          lock. Return null if a conflicting lock is present.
    **/
   public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode,
@@ -515,8 +518,8 @@ public class ZooKeeperHiveLockManager im
     try {
       int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
       String quorumServers = getQuorumServers(conf);
-      Watcher dummWatcher = new DummyWatcher();
-      zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummWatcher);
+      Watcher dummyWatcher = new DummyWatcher();
+      zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummyWatcher);
       String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
       List<HiveLock> locks = getLocks(conf, zkpClient, null, parent, false, false);
       Exception lastExceptionGot = null;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Mon Sep  8 04:38:17 2014
@@ -109,6 +109,7 @@ import org.apache.hadoop.hive.shims.Hado
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TException;
 
 import com.google.common.collect.Sets;
@@ -128,6 +129,7 @@ public class Hive {
 
   private HiveConf conf = null;
   private IMetaStoreClient metaStoreClient;
+  private UserGroupInformation owner;
 
   private static ThreadLocal<Hive> hiveDB = new ThreadLocal<Hive>() {
     @Override
@@ -181,7 +183,11 @@ public class Hive {
    */
   public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException {
     Hive db = hiveDB.get();
-    if (db == null || needsRefresh) {
+    if (db == null || needsRefresh || !db.isCurrentUserOwner()) {
+      if (db != null) {
+        LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh +
+          ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
+      }
       closeCurrent();
       c.set("fs.scheme.class", "dfs");
       Hive newdb = new Hive(c);
@@ -194,6 +200,11 @@ public class Hive {
 
   public static Hive get() throws HiveException {
     Hive db = hiveDB.get();
+    if (db != null && !db.isCurrentUserOwner()) {
+      LOG.debug("Creating new db. db.isCurrentUserOwner = " + db.isCurrentUserOwner());
+      db.close();
+      db = null;
+    }
     if (db == null) {
       SessionState session = SessionState.get();
       db = new Hive(session == null ? new HiveConf(Hive.class) : session.getConf());
@@ -220,6 +231,17 @@ public class Hive {
     conf = c;
   }
 
+
+  private boolean isCurrentUserOwner() throws HiveException {
+    try {
+      return owner == null || owner.equals(UserGroupInformation.getCurrentUser());
+    } catch(IOException e) {
+      throw new HiveException("Error getting current user: " + e.getMessage(), e);
+    }
+  }
+
+
+
   /**
    * closes the connection to metastore for the calling thread
    */
@@ -2496,6 +2518,13 @@ private void constructOneLBLocationMap(F
   @Unstable
   public IMetaStoreClient getMSC() throws MetaException {
     if (metaStoreClient == null) {
+      try {
+        owner = UserGroupInformation.getCurrentUser();
+      } catch(IOException e) {
+        String msg = "Error getting current user: " + e.getMessage();
+        LOG.error(msg, e);
+        throw new MetaException(msg + "\n" + StringUtils.stringifyException(e));
+      }
       metaStoreClient = createMetaStoreClient();
     }
     return metaStoreClient;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java Mon Sep  8 04:38:17 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
 import org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator;
 import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
@@ -307,7 +308,7 @@ public final class HiveUtils {
     try {
       Class<? extends HiveStorageHandler> handlerClass =
         (Class<? extends HiveStorageHandler>)
-        Class.forName(className, true, JavaUtils.getClassLoader());
+        Class.forName(className, true, Utilities.getSessionSpecifiedClassLoader());
       HiveStorageHandler storageHandler = ReflectionUtils.newInstance(handlerClass, conf);
       return storageHandler;
     } catch (ClassNotFoundException e) {
@@ -329,7 +330,7 @@ public final class HiveUtils {
     try {
       Class<? extends HiveIndexHandler> handlerClass =
         (Class<? extends HiveIndexHandler>)
-        Class.forName(indexHandlerClass, true, JavaUtils.getClassLoader());
+        Class.forName(indexHandlerClass, true, Utilities.getSessionSpecifiedClassLoader());
       HiveIndexHandler indexHandler = ReflectionUtils.newInstance(handlerClass, conf);
       return indexHandler;
     } catch (ClassNotFoundException e) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Mon Sep  8 04:38:17 2014
@@ -302,7 +302,7 @@ public class Partition implements Serial
       }
       try {
         inputFormatClass = ((Class<? extends InputFormat>) Class.forName(clsName, true,
-            JavaUtils.getClassLoader()));
+            Utilities.getSessionSpecifiedClassLoader()));
       } catch (ClassNotFoundException e) {
         throw new HiveException("Class not found: " + clsName, e);
       }
@@ -322,7 +322,7 @@ public class Partition implements Serial
       }
       try {
         Class<?> c = (Class.forName(clsName, true,
-            JavaUtils.getClassLoader()));
+            Utilities.getSessionSpecifiedClassLoader()));
         // Replace FileOutputFormat for backward compatibility
         if (!HiveOutputFormat.class.isAssignableFrom(c)) {
           outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(c,false);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Mon Sep  8 04:38:17 2014
@@ -13,6 +13,8 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.common.FileUtils;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Mon Sep  8 04:38:17 2014
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
@@ -293,7 +294,7 @@ public class Table implements Serializab
           inputFormatClass = getStorageHandler().getInputFormatClass();
         } else {
           inputFormatClass = (Class<? extends InputFormat>)
-            Class.forName(className, true, JavaUtils.getClassLoader());
+            Class.forName(className, true, Utilities.getSessionSpecifiedClassLoader());
         }
       } catch (ClassNotFoundException e) {
         throw new RuntimeException(e);
@@ -329,7 +330,7 @@ public class Table implements Serializab
             }
             else {
               c = Class.forName(className, true,
-                  JavaUtils.getClassLoader());
+                  Utilities.getSessionSpecifiedClassLoader());
             }
         }
         if (!HiveOutputFormat.class.isAssignableFrom(c)) {
@@ -677,7 +678,7 @@ public class Table implements Serializab
     }
     try {
       setInputFormatClass((Class<? extends InputFormat<WritableComparable, Writable>>) Class
-          .forName(name, true, JavaUtils.getClassLoader()));
+          .forName(name, true, Utilities.getSessionSpecifiedClassLoader()));
     } catch (ClassNotFoundException e) {
       throw new HiveException("Class not found: " + name, e);
     }
@@ -690,7 +691,7 @@ public class Table implements Serializab
       return;
     }
     try {
-      Class<?> origin = Class.forName(name, true, JavaUtils.getClassLoader());
+      Class<?> origin = Class.forName(name, true, Utilities.getSessionSpecifiedClassLoader());
       setOutputFormatClass(HiveFileFormatUtils
           .getOutputFormatSubstitute(origin,false));
     } catch (ClassNotFoundException e) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Mon Sep  8 04:38:17 2014
@@ -369,7 +369,7 @@ public final class ColumnPrunerProcFacto
         return null;
       }
       cols = cols == null ? new ArrayList<String>() : cols;
-      
+
       cppCtx.getPrunedColLists().put((Operator<? extends OperatorDesc>) nd,
           cols);
       RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver();
@@ -479,13 +479,13 @@ public final class ColumnPrunerProcFacto
           flags[index] = true;
           colLists = Utilities.mergeUniqElems(colLists, valCols.get(index).getCols());
         }
-        
+
         Collections.sort(colLists);
         pruneReduceSinkOperator(flags, op, cppCtx);
         cppCtx.getPrunedColLists().put(op, colLists);
         return null;
       }
-      
+
       // Reduce Sink contains the columns needed - no need to aggregate from
       // children
       for (ExprNodeDesc val : valCols) {
@@ -519,7 +519,7 @@ public final class ColumnPrunerProcFacto
       if (cols == null) {
         return null;
       }
-      
+
       Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
       // As columns go down the DAG, the LVJ will transform internal column
       // names from something like 'key' to '_col0'. Because of this, we need
@@ -604,8 +604,8 @@ public final class ColumnPrunerProcFacto
         Object... nodeOutputs) throws SemanticException {
       SelectOperator op = (SelectOperator) nd;
       ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
-      
-      
+
+
       if (op.getChildOperators() != null) {
         for (Operator<? extends OperatorDesc> child : op.getChildOperators()) {
           // UDTF is not handled yet, so the parent SelectOp of UDTF should just assume
@@ -858,11 +858,11 @@ public final class ColumnPrunerProcFacto
     if (inputSchema != null) {
       ArrayList<ColumnInfo> rs = new ArrayList<ColumnInfo>();
       ArrayList<ColumnInfo> inputCols = inputSchema.getSignature();
-    	for (ColumnInfo i: inputCols) {
+      for (ColumnInfo i: inputCols) {
         if (cols.contains(i.getInternalName())) {
           rs.add(i);
         }
-    	}
+      }
       op.getSchema().setSignature(rs);
     }
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Mon Sep  8 04:38:17 2014
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -29,7 +29,6 @@ import java.util.Stack;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hive.ql.exec.Ro
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -100,7 +100,7 @@ public final class ConstantPropagateProc
 
   /**
    * Get ColumnInfo from column expression.
-   * 
+   *
    * @param rr
    * @param desc
    * @return
@@ -139,7 +139,7 @@ public final class ConstantPropagateProc
 
   /**
    * Cast type from expression type to expected type ti.
-   * 
+   *
    * @param desc constant expression
    * @param ti expected type info
    * @return cast constant, or null if the type cast failed.
@@ -189,10 +189,10 @@ public final class ConstantPropagateProc
 
   /**
    * Fold input expression desc.
-   * 
+   *
    * If desc is a UDF and all parameters are constants, evaluate it. If desc is a column expression,
    * find it from propagated constants, and if there is, replace it with constant.
-   * 
+   *
    * @param desc folding expression
    * @param constants current propagated constant map
    * @param cppCtx
@@ -275,7 +275,7 @@ public final class ConstantPropagateProc
       String udfClassName = bridge.getUdfClassName();
       try {
         UDF udfInternal =
-            (UDF) Class.forName(bridge.getUdfClassName(), true, JavaUtils.getClassLoader())
+            (UDF) Class.forName(bridge.getUdfClassName(), true, Utilities.getSessionSpecifiedClassLoader())
                 .newInstance();
         files = udfInternal.getRequiredFiles();
         jars = udf.getRequiredJars();
@@ -296,7 +296,7 @@ public final class ConstantPropagateProc
 
   /**
    * Propagate assignment expression, adding an entry into constant map constants.
-   * 
+   *
    * @param udf expression UDF, currently only 2 UDFs are supported: '=' and 'is null'.
    * @param newExprs child expressions (parameters).
    * @param cppCtx
@@ -350,7 +350,7 @@ public final class ConstantPropagateProc
           ExprNodeConstantDesc c = (ExprNodeConstantDesc) childExpr;
           if (Boolean.TRUE.equals(c.getValue())) {
 
-        	  // if true, prune it
+            // if true, prune it
             return newExprs.get(Math.abs(i - 1));
           } else {
 
@@ -384,7 +384,7 @@ public final class ConstantPropagateProc
 
   /**
    * Evaluate column, replace the deterministic columns with constants if possible
-   * 
+   *
    * @param desc
    * @param ctx
    * @param op
@@ -435,7 +435,7 @@ public final class ConstantPropagateProc
 
   /**
    * Evaluate UDF
-   * 
+   *
    * @param udf UDF object
    * @param exprs
    * @param oldExprs
@@ -512,7 +512,7 @@ public final class ConstantPropagateProc
 
   /**
    * Change operator row schema, replace column with constant if it is.
-   * 
+   *
    * @param op
    * @param constants
    * @throws SemanticException
@@ -584,7 +584,7 @@ public final class ConstantPropagateProc
 
   /**
    * Factory method to get the ConstantPropagateFilterProc class.
-   * 
+   *
    * @return ConstantPropagateFilterProc
    */
   public static ConstantPropagateFilterProc getFilterProc() {
@@ -621,7 +621,7 @@ public final class ConstantPropagateProc
 
   /**
    * Factory method to get the ConstantPropagateGroupByProc class.
-   * 
+   *
    * @return ConstantPropagateGroupByProc
    */
   public static ConstantPropagateGroupByProc getGroupByProc() {
@@ -650,7 +650,7 @@ public final class ConstantPropagateProc
 
   /**
    * Factory method to get the ConstantPropagateDefaultProc class.
-   * 
+   *
    * @return ConstantPropagateDefaultProc
    */
   public static ConstantPropagateDefaultProc getDefaultProc() {
@@ -683,7 +683,7 @@ public final class ConstantPropagateProc
 
   /**
    * The Factory method to get the ConstantPropagateSelectProc class.
-   * 
+   *
    * @return ConstantPropagateSelectProc
    */
   public static ConstantPropagateSelectProc getSelectProc() {
@@ -877,7 +877,7 @@ public final class ConstantPropagateProc
         return null;
       }
 
-      // Note: the following code (removing folded constants in exprs) is deeply coupled with 
+      // Note: the following code (removing folded constants in exprs) is deeply coupled with
       //    ColumnPruner optimizer.
       // Assuming ColumnPrunner will remove constant columns so we don't deal with output columns.
       //    Except one case that the join operator is followed by a redistribution (RS operator).
@@ -941,12 +941,12 @@ public final class ConstantPropagateProc
         return null;
       }
 
-      List<ExprNodeDesc> newChildren = new ArrayList<ExprNodeDesc>();
-      for (ExprNodeDesc expr : pred.getChildren()) {
-        ExprNodeDesc constant = foldExpr(expr, constants, cppCtx, op, 0, false);
-        newChildren.add(constant);
+      ExprNodeDesc constant = foldExpr(pred, constants, cppCtx, op, 0, false);
+      if (constant instanceof ExprNodeGenericFuncDesc) {
+        conf.setFilterExpr((ExprNodeGenericFuncDesc) constant);
+      } else {
+        conf.setFilterExpr(null);
       }
-      pred.setChildren(newChildren);
       return null;
     }
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Mon Sep  8 04:38:17 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -27,6 +28,7 @@ import java.util.Stack;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -363,6 +366,19 @@ public class ConvertJoinMapJoin implemen
     Operator<? extends OperatorDesc> parentBigTableOp
       = mapJoinOp.getParentOperators().get(bigTablePosition);
     if (parentBigTableOp instanceof ReduceSinkOperator) {
+      for (Operator<?> p : parentBigTableOp.getParentOperators()) {
+        // we might have generated a dynamic partition operator chain. Since
+        // we're removing the reduce sink we need do remove that too.
+        Set<Operator<?>> dynamicPartitionOperators = new HashSet<Operator<?>>();
+        for (Operator<?> c : p.getChildOperators()) {
+          if (hasDynamicPartitionBroadcast(c)) {
+            dynamicPartitionOperators.add(c);
+          }
+        }
+        for (Operator<?> c : dynamicPartitionOperators) {
+          p.removeChild(c);
+        }
+      }
       mapJoinOp.getParentOperators().remove(bigTablePosition);
       if (!(mapJoinOp.getParentOperators().contains(
               parentBigTableOp.getParentOperators().get(0)))) {
@@ -380,4 +396,16 @@ public class ConvertJoinMapJoin implemen
 
     return mapJoinOp;
   }
+
+  private boolean hasDynamicPartitionBroadcast(Operator<?> op) {
+    if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
+      return true;
+    }
+    for (Operator<?> c : op.getChildOperators()) {
+      if (hasDynamicPartitionBroadcast(c)) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Mon Sep  8 04:38:17 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.ppd.PredicatePushDown;
 import org.apache.hadoop.hive.ql.ppd.PredicateTransitivePropagate;
+import org.apache.hadoop.hive.ql.ppd.SyntheticJoinPredicate;
 
 /**
  * Implementation of the optimizer.
@@ -55,6 +56,7 @@ public class Optimizer {
     transformations.add(new Generator());
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
       transformations.add(new PredicateTransitivePropagate());
+      transformations.add(new SyntheticJoinPredicate());
       transformations.add(new PredicatePushDown());
       transformations.add(new PartitionPruner());
       transformations.add(new PartitionConditionRemover());
@@ -125,9 +127,9 @@ public class Optimizer {
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) {
       transformations.add(new StatsOptimizer());
     }
-    if (pctx.getContext().getExplain() ||
-        HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ||
-        HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+    if (pctx.getContext().getExplain()
+        && !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
+        && !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
       transformations.add(new AnnotateWithStatistics());
       transformations.add(new AnnotateWithOpTraits());
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java Mon Sep  8 04:38:17 2014
@@ -186,8 +186,7 @@ public abstract class PrunerExpressionOp
         return ((ExprNodeNullDesc) nd).clone();
       }
 
-      assert (false);
-      return null;
+      return new ExprNodeConstantDesc(((ExprNodeDesc)nd).getTypeInfo(), null);
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Mon Sep  8 04:38:17 2014
@@ -26,8 +26,6 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -42,7 +40,6 @@ import org.apache.hadoop.hive.ql.parse.G
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.ColStatistics;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -134,7 +131,8 @@ public class ReduceSinkMapJoinProc imple
         String prefix = Utilities.ReduceField.KEY.toString();
         for (String keyCol : keyCols) {
           ExprNodeDesc realCol = parentRS.getColumnExprMap().get(prefix + "." + keyCol);
-          ColStatistics cs = StatsUtils.getColStatisticsFromExpression(null, stats, realCol);
+          ColStatistics cs =
+              StatsUtils.getColStatisticsFromExpression(context.conf, stats, realCol);
           if (cs == null || cs.getCountDistint() <= 0) {
             maxKeyCount = Long.MAX_VALUE;
             break;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java Mon Sep  8 04:38:17 2014
@@ -353,14 +353,14 @@ public class OpProcFactory {
           if (inpOp.getSchema() != null && inpOp.getSchema().getSignature() != null ) {
             for(ColumnInfo ci : inpOp.getSchema().getSignature()) {
               Dependency inp_dep = lctx.getIndex().getDependency(inpOp, ci);
-            	// The dependency can be null as some of the input cis may not have
-            	// been set in case of joins.
-            	if (inp_dep != null) {
-            	  for(BaseColumnInfo bci : inp_dep.getBaseCols()) {
-            	    new_type = LineageCtx.getNewDependencyType(inp_dep.getType(), new_type);
-            	    tai_set.add(bci.getTabAlias());
-            	  }
-            	}
+              // The dependency can be null as some of the input cis may not have
+              // been set in case of joins.
+              if (inp_dep != null) {
+                for(BaseColumnInfo bci : inp_dep.getBaseCols()) {
+                  new_type = LineageCtx.getNewDependencyType(inp_dep.getType(), new_type);
+                  tai_set.add(bci.getTabAlias());
+                }
+              }
             }
           }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java Mon Sep  8 04:38:17 2014
@@ -396,8 +396,7 @@ public final class PcrExprProcFactory {
         return new NodeInfoWrapper(WalkState.CONSTANT, null,
             (ExprNodeDesc) nd);
       }
-      assert (false);
-      return null;
+      return new NodeInfoWrapper(WalkState.UNKNOWN, null, (ExprNodeDesc)nd);
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Mon Sep  8 04:38:17 2014
@@ -156,6 +156,10 @@ public class Vectorizer implements Physi
     // The regex matches only the "decimal" prefix of the type.
     patternBuilder.append("|decimal.*");
 
+    // CHAR and VARCHAR types can be specified with maximum length.
+    patternBuilder.append("|char.*");
+    patternBuilder.append("|varchar.*");
+
     supportedDataTypesPattern = Pattern.compile(patternBuilder.toString());
 
     supportedGenericUDFs.add(GenericUDFOPPlus.class);
@@ -248,6 +252,8 @@ public class Vectorizer implements Physi
     supportedGenericUDFs.add(GenericUDFTimestamp.class);
     supportedGenericUDFs.add(GenericUDFToDecimal.class);
     supportedGenericUDFs.add(GenericUDFToDate.class);
+    supportedGenericUDFs.add(GenericUDFToChar.class);
+    supportedGenericUDFs.add(GenericUDFToVarchar.class);
 
     // For conditional expressions
     supportedGenericUDFs.add(GenericUDFIf.class);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Mon Sep  8 04:38:17 2014
@@ -20,11 +20,11 @@ package org.apache.hadoop.hive.ql.optimi
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -164,7 +164,7 @@ public class StatsRulesProcFactory {
                   sop.getSchema());
           long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats);
           stats.setColumnStats(colStats);
-          stats.setDataSize(dataSize);
+          stats.setDataSize(setMaxIfInvalid(dataSize));
           sop.setStatistics(stats);
 
           if (LOG.isDebugEnabled()) {
@@ -250,7 +250,8 @@ public class StatsRulesProcFactory {
           ExprNodeDesc pred = fop.getConf().getPredicate();
 
           // evaluate filter expression and update statistics
-          long newNumRows = evaluateExpression(parentStats, pred, aspCtx, neededCols);
+          long newNumRows = evaluateExpression(parentStats, pred, aspCtx,
+              neededCols, fop);
           Statistics st = parentStats.clone();
 
           if (satisfyPrecondition(parentStats)) {
@@ -260,7 +261,7 @@ public class StatsRulesProcFactory {
             // result in number of rows getting more than the input rows in
             // which case stats need not be updated
             if (newNumRows <= parentStats.getNumRows()) {
-              updateStats(st, newNumRows, true);
+              updateStats(st, newNumRows, true, fop);
             }
 
             if (LOG.isDebugEnabled()) {
@@ -270,7 +271,7 @@ public class StatsRulesProcFactory {
 
             // update only the basic statistics in the absence of column statistics
             if (newNumRows <= parentStats.getNumRows()) {
-              updateStats(st, newNumRows, false);
+              updateStats(st, newNumRows, false, fop);
             }
 
             if (LOG.isDebugEnabled()) {
@@ -287,7 +288,8 @@ public class StatsRulesProcFactory {
     }
 
     private long evaluateExpression(Statistics stats, ExprNodeDesc pred,
-        AnnotateStatsProcCtx aspCtx, List<String> neededCols) throws CloneNotSupportedException {
+        AnnotateStatsProcCtx aspCtx, List<String> neededCols,
+        FilterOperator fop) throws CloneNotSupportedException {
       long newNumRows = 0;
       Statistics andStats = null;
 
@@ -302,24 +304,26 @@ public class StatsRulesProcFactory {
 
           // evaluate children
           for (ExprNodeDesc child : genFunc.getChildren()) {
-            newNumRows = evaluateChildExpr(aspCtx.getAndExprStats(), child, aspCtx, neededCols);
+            newNumRows = evaluateChildExpr(aspCtx.getAndExprStats(), child,
+                aspCtx, neededCols, fop);
             if (satisfyPrecondition(aspCtx.getAndExprStats())) {
-              updateStats(aspCtx.getAndExprStats(), newNumRows, true);
+              updateStats(aspCtx.getAndExprStats(), newNumRows, true, fop);
             } else {
-              updateStats(aspCtx.getAndExprStats(), newNumRows, false);
+              updateStats(aspCtx.getAndExprStats(), newNumRows, false, fop);
             }
           }
         } else if (udf instanceof GenericUDFOPOr) {
           // for OR condition independently compute and update stats
           for (ExprNodeDesc child : genFunc.getChildren()) {
-            newNumRows += evaluateChildExpr(stats, child, aspCtx, neededCols);
+            newNumRows += evaluateChildExpr(stats, child, aspCtx, neededCols,
+                fop);
           }
         } else if (udf instanceof GenericUDFOPNot) {
-          newNumRows = evaluateNotExpr(stats, pred, aspCtx, neededCols);
+          newNumRows = evaluateNotExpr(stats, pred, aspCtx, neededCols, fop);
         } else {
 
           // single predicate condition
-          newNumRows = evaluateChildExpr(stats, pred, aspCtx, neededCols);
+          newNumRows = evaluateChildExpr(stats, pred, aspCtx, neededCols, fop);
         }
       } else if (pred instanceof ExprNodeColumnDesc) {
 
@@ -351,8 +355,9 @@ public class StatsRulesProcFactory {
       return newNumRows;
     }
 
-    private long evaluateNotExpr(Statistics stats, ExprNodeDesc pred, AnnotateStatsProcCtx aspCtx,
-        List<String> neededCols) throws CloneNotSupportedException {
+    private long evaluateNotExpr(Statistics stats, ExprNodeDesc pred,
+        AnnotateStatsProcCtx aspCtx, List<String> neededCols, FilterOperator fop)
+        throws CloneNotSupportedException {
 
       long numRows = stats.getNumRows();
 
@@ -364,8 +369,9 @@ public class StatsRulesProcFactory {
 
             // GenericUDF
             long newNumRows = 0;
-            for (ExprNodeDesc child : ((ExprNodeGenericFuncDesc) pred).getChildren()) {
-              newNumRows = evaluateChildExpr(stats, child, aspCtx, neededCols);
+            for (ExprNodeDesc child : genFunc.getChildren()) {
+              newNumRows = evaluateChildExpr(stats, child, aspCtx, neededCols,
+                  fop);
             }
             return numRows - newNumRows;
           } else if (leaf instanceof ExprNodeConstantDesc) {
@@ -398,8 +404,7 @@ public class StatsRulesProcFactory {
       return numRows / 2;
     }
 
-    private long evaluateColEqualsNullExpr(Statistics stats, ExprNodeDesc pred,
-        AnnotateStatsProcCtx aspCtx) {
+    private long evaluateColEqualsNullExpr(Statistics stats, ExprNodeDesc pred) {
 
       long numRows = stats.getNumRows();
 
@@ -425,7 +430,8 @@ public class StatsRulesProcFactory {
     }
 
     private long evaluateChildExpr(Statistics stats, ExprNodeDesc child,
-        AnnotateStatsProcCtx aspCtx, List<String> neededCols) throws CloneNotSupportedException {
+        AnnotateStatsProcCtx aspCtx, List<String> neededCols,
+        FilterOperator fop) throws CloneNotSupportedException {
 
       long numRows = stats.getNumRows();
 
@@ -434,7 +440,8 @@ public class StatsRulesProcFactory {
         ExprNodeGenericFuncDesc genFunc = (ExprNodeGenericFuncDesc) child;
         GenericUDF udf = genFunc.getGenericUDF();
 
-        if (udf instanceof GenericUDFOPEqual || udf instanceof GenericUDFOPEqualNS) {
+        if (udf instanceof GenericUDFOPEqual ||
+            udf instanceof GenericUDFOPEqualNS) {
           String colName = null;
           String tabAlias = null;
           boolean isConst = false;
@@ -506,13 +513,13 @@ public class StatsRulesProcFactory {
             || udf instanceof GenericUDFOPLessThan) {
           return numRows / 3;
         } else if (udf instanceof GenericUDFOPNotNull) {
-          long newNumRows = evaluateColEqualsNullExpr(stats, genFunc, aspCtx);
+          long newNumRows = evaluateColEqualsNullExpr(stats, genFunc);
           return stats.getNumRows() - newNumRows;
         } else if (udf instanceof GenericUDFOPNull) {
-          return evaluateColEqualsNullExpr(stats, genFunc, aspCtx);
+          return evaluateColEqualsNullExpr(stats, genFunc);
         } else if (udf instanceof GenericUDFOPAnd || udf instanceof GenericUDFOPOr
             || udf instanceof GenericUDFOPNot) {
-          return evaluateExpression(stats, genFunc, aspCtx, neededCols);
+          return evaluateExpression(stats, genFunc, aspCtx, neededCols, fop);
         }
       }
 
@@ -580,6 +587,23 @@ public class StatsRulesProcFactory {
       Map<String, ExprNodeDesc> colExprMap = gop.getColumnExprMap();
       RowSchema rs = gop.getSchema();
       Statistics stats = null;
+      boolean mapSide = false;
+      int multiplier = mapSideParallelism;
+      long newNumRows;
+      long newDataSize;
+
+      // map side
+      if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator ||
+          gop.getChildOperators().get(0) instanceof AppMasterEventOperator) {
+
+         mapSide = true;
+
+        // map-side grouping set present. if grouping set is present then
+        // multiply the number of rows by number of elements in grouping set
+        if (gop.getConf().isGroupingSetsPresent()) {
+          multiplier *= gop.getConf().getListGroupingSets().size();
+        }
+      }
 
       try {
         if (satisfyPrecondition(parentStats)) {
@@ -589,7 +613,6 @@ public class StatsRulesProcFactory {
               StatsUtils.getColStatisticsFromExprMap(conf, parentStats, colExprMap, rs);
           stats.setColumnStats(colStats);
           long dvProd = 1;
-          long newNumRows = 0;
 
           // compute product of distinct values of grouping columns
           for (ColStatistics cs : colStats) {
@@ -617,7 +640,7 @@ public class StatsRulesProcFactory {
           }
 
           // map side
-          if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator) {
+          if (mapSide) {
 
             // since we do not know if hash-aggregation will be enabled or disabled
             // at runtime we will assume that map-side group by does not do any
@@ -626,14 +649,10 @@ public class StatsRulesProcFactory {
             // map-side grouping set present. if grouping set is present then
             // multiply the number of rows by number of elements in grouping set
             if (gop.getConf().isGroupingSetsPresent()) {
-              int multiplier = gop.getConf().getListGroupingSets().size();
-
-              // take into account the map-side parallelism as well, default is 1
-              multiplier *= mapSideParallelism;
-              newNumRows = multiplier * stats.getNumRows();
-              long dataSize = multiplier * stats.getDataSize();
+              newNumRows = setMaxIfInvalid(multiplier * stats.getNumRows());
+              newDataSize = setMaxIfInvalid(multiplier * stats.getDataSize());
               stats.setNumRows(newNumRows);
-              stats.setDataSize(dataSize);
+              stats.setDataSize(newDataSize);
               for (ColStatistics cs : colStats) {
                 if (cs != null) {
                   long oldNumNulls = cs.getNumNulls();
@@ -644,29 +663,33 @@ public class StatsRulesProcFactory {
             } else {
 
               // map side no grouping set
-              newNumRows = stats.getNumRows() * mapSideParallelism;
-              updateStats(stats, newNumRows, true);
+              newNumRows = stats.getNumRows() * multiplier;
+              updateStats(stats, newNumRows, true, gop);
             }
           } else {
 
             // reduce side
             newNumRows = applyGBYRule(stats.getNumRows(), dvProd);
-            updateStats(stats, newNumRows, true);
+            updateStats(stats, newNumRows, true, gop);
           }
         } else {
           if (parentStats != null) {
 
+            stats = parentStats.clone();
+
             // worst case, in the absence of column statistics assume half the rows are emitted
-            if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator) {
+            if (mapSide) {
 
               // map side
-              stats = parentStats.clone();
+              newNumRows = multiplier * stats.getNumRows();
+              newDataSize = multiplier * stats.getDataSize();
+              stats.setNumRows(newNumRows);
+              stats.setDataSize(newDataSize);
             } else {
 
               // reduce side
-              stats = parentStats.clone();
-              long newNumRows = parentStats.getNumRows() / 2;
-              updateStats(stats, newNumRows, false);
+              newNumRows = parentStats.getNumRows() / 2;
+              updateStats(stats, newNumRows, false, gop);
             }
           }
         }
@@ -700,7 +723,7 @@ public class StatsRulesProcFactory {
             // only if the column stats is available, update the data size from
             // the column stats
             if (!stats.getColumnStatsState().equals(Statistics.State.NONE)) {
-              updateStats(stats, stats.getNumRows(), true);
+              updateStats(stats, stats.getNumRows(), true, gop);
             }
           }
 
@@ -709,7 +732,7 @@ public class StatsRulesProcFactory {
           // rows will be 1
           if (colExprMap.isEmpty()) {
             stats.setNumRows(1);
-            updateStats(stats, 1, true);
+            updateStats(stats, 1, true, gop);
           }
         }
 
@@ -817,6 +840,7 @@ public class StatsRulesProcFactory {
 
           Map<String, ColStatistics> joinedColStats = Maps.newHashMap();
           Map<Integer, List<String>> joinKeys = Maps.newHashMap();
+          List<Long> rowCounts = Lists.newArrayList();
 
           // get the join keys from parent ReduceSink operators
           for (int pos = 0; pos < parents.size(); pos++) {
@@ -836,6 +860,7 @@ public class StatsRulesProcFactory {
             for (String tabAlias : tableAliases) {
               rowCountParents.put(tabAlias, parentStats.getNumRows());
             }
+            rowCounts.add(parentStats.getNumRows());
 
             // multi-attribute join key
             if (keyExprs.size() > 1) {
@@ -936,22 +961,14 @@ public class StatsRulesProcFactory {
 
           // update join statistics
           stats.setColumnStats(outColStats);
-          long newRowCount = computeNewRowCount(
-              Lists.newArrayList(rowCountParents.values()), denom);
-
-          if (newRowCount <= 0 && LOG.isDebugEnabled()) {
-            newRowCount = 0;
-            LOG.debug("[0] STATS-" + jop.toString() + ": Product of #rows might be greater than"
-                + " denominator or overflow might have occurred. Resetting row count to 0."
-                + " #Rows of parents: " + rowCountParents.toString() + ". Denominator: " + denom);
-          }
+          long newRowCount = computeNewRowCount(rowCounts, denom);
 
-          updateStatsForJoinType(stats, newRowCount, jop.getConf(),
-              rowCountParents, outInTabAlias);
+          updateStatsForJoinType(stats, newRowCount, jop, rowCountParents,
+              outInTabAlias);
           jop.setStatistics(stats);
 
           if (LOG.isDebugEnabled()) {
-            LOG.debug("[1] STATS-" + jop.toString() + ": " + stats.extendedToString());
+            LOG.debug("[0] STATS-" + jop.toString() + ": " + stats.extendedToString());
           }
         } else {
 
@@ -979,14 +996,13 @@ public class StatsRulesProcFactory {
           long maxDataSize = parentSizes.get(maxRowIdx);
           long newNumRows = (long) (joinFactor * maxRowCount * (numParents - 1));
           long newDataSize = (long) (joinFactor * maxDataSize * (numParents - 1));
-
           Statistics wcStats = new Statistics();
-          wcStats.setNumRows(newNumRows);
-          wcStats.setDataSize(newDataSize);
+          wcStats.setNumRows(setMaxIfInvalid(newNumRows));
+          wcStats.setDataSize(setMaxIfInvalid(newDataSize));
           jop.setStatistics(wcStats);
 
           if (LOG.isDebugEnabled()) {
-            LOG.debug("[2] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
+            LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
           }
         }
       }
@@ -1008,8 +1024,15 @@ public class StatsRulesProcFactory {
     }
 
     private void updateStatsForJoinType(Statistics stats, long newNumRows,
-        JoinDesc conf, Map<String, Long> rowCountParents,
+        CommonJoinOperator<? extends JoinDesc> jop,
+        Map<String, Long> rowCountParents,
         Map<String, String> outInTabAlias) {
+
+      if (newNumRows <= 0) {
+        LOG.info("STATS-" + jop.toString() + ": Overflow in number of rows."
+          + newNumRows + " rows will be set to Long.MAX_VALUE");
+      }
+      newNumRows = setMaxIfInvalid(newNumRows);
       stats.setNumRows(newNumRows);
 
       // scale down/up the column statistics based on the changes in number of
@@ -1040,7 +1063,7 @@ public class StatsRulesProcFactory {
       stats.setColumnStats(colStats);
       long newDataSize = StatsUtils
           .getDataSizeFromColumnStats(newNumRows, colStats);
-      stats.setDataSize(newDataSize);
+      stats.setDataSize(setMaxIfInvalid(newDataSize));
     }
 
     private long computeNewRowCount(List<Long> rowCountParents, long denom) {
@@ -1168,7 +1191,7 @@ public class StatsRulesProcFactory {
           // if limit is greater than available rows then do not update
           // statistics
           if (limit <= parentStats.getNumRows()) {
-            updateStats(stats, limit, true);
+            updateStats(stats, limit, true, lop);
           }
           lop.setStatistics(stats);
 
@@ -1185,8 +1208,8 @@ public class StatsRulesProcFactory {
               long numRows = limit;
               long avgRowSize = parentStats.getAvgRowSize();
               long dataSize = avgRowSize * limit;
-              wcStats.setNumRows(numRows);
-              wcStats.setDataSize(dataSize);
+              wcStats.setNumRows(setMaxIfInvalid(numRows));
+              wcStats.setDataSize(setMaxIfInvalid(dataSize));
             }
             lop.setStatistics(wcStats);
 
@@ -1364,7 +1387,15 @@ public class StatsRulesProcFactory {
    * @param useColStats
    *          - use column statistics to compute data size
    */
-  static void updateStats(Statistics stats, long newNumRows, boolean useColStats) {
+  static void updateStats(Statistics stats, long newNumRows,
+      boolean useColStats, Operator<? extends OperatorDesc> op) {
+
+    if (newNumRows <= 0) {
+      LOG.info("STATS-" + op.toString() + ": Overflow in number of rows."
+          + newNumRows + " rows will be set to Long.MAX_VALUE");
+    }
+
+    newNumRows = setMaxIfInvalid(newNumRows);
     long oldRowCount = stats.getNumRows();
     double ratio = (double) newNumRows / (double) oldRowCount;
     stats.setNumRows(newNumRows);
@@ -1389,10 +1420,10 @@ public class StatsRulesProcFactory {
       }
       stats.setColumnStats(colStats);
       long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats);
-      stats.setDataSize(newDataSize);
+      stats.setDataSize(setMaxIfInvalid(newDataSize));
     } else {
       long newDataSize = (long) (ratio * stats.getDataSize());
-      stats.setDataSize(newDataSize);
+      stats.setDataSize(setMaxIfInvalid(newDataSize));
     }
   }
 
@@ -1401,4 +1432,13 @@ public class StatsRulesProcFactory {
         && !stats.getColumnStatsState().equals(Statistics.State.NONE);
   }
 
+  /**
+   * negative number of rows or data sizes are invalid. It could be because of
+   * long overflow in which case return Long.MAX_VALUE
+   * @param val - input value
+   * @return Long.MAX_VALUE if val is negative else val
+   */
+  static long setMaxIfInvalid(long val) {
+    return val < 0 ? Long.MAX_VALUE : val;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Mon Sep  8 04:38:17 2014
@@ -289,7 +289,8 @@ public abstract class BaseSemanticAnalyz
   }
 
   public static String getUnescapedName(ASTNode tableOrColumnNode, String currentDatabase) {
-    if (tableOrColumnNode.getToken().getType() == HiveParser.TOK_TABNAME) {
+    int tokenType = tableOrColumnNode.getToken().getType();
+    if (tokenType == HiveParser.TOK_TABNAME) {
       // table node
       if (tableOrColumnNode.getChildCount() == 2) {
         String dbName = unescapeIdentifier(tableOrColumnNode.getChild(0).getText());
@@ -301,6 +302,8 @@ public abstract class BaseSemanticAnalyz
         return currentDatabase + "." + tableName;
       }
       return tableName;
+    } else if (tokenType == HiveParser.StringLiteral) {
+      return unescapeSQLString(tableOrColumnNode.getText());
     }
     // column node
     return unescapeIdentifier(tableOrColumnNode.getText());

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java Mon Sep  8 04:38:17 2014
@@ -22,16 +22,14 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 
 /**
- * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks
+ * FileSinkProcessor is a simple rule to remember seen file sinks for later
+ * processing.
  *
  */
 public class FileSinkProcessor implements NodeProcessor {
@@ -39,12 +37,6 @@ public class FileSinkProcessor implement
   static final private Log LOG = LogFactory.getLog(FileSinkProcessor.class.getName());
 
   @Override
-  /*
-   * (non-Javadoc)
-   * we should ideally not modify the tree we traverse.
-   * However, since we need to walk the tree at any time when we modify the
-   * operator, we might as well do it here.
-   */
   public Object process(Node nd, Stack<Node> stack,
       NodeProcessorCtx procCtx, Object... nodeOutputs)
       throws SemanticException {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Mon Sep  8 04:38:17 2014
@@ -26,29 +26,28 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 
 /**
@@ -134,6 +133,15 @@ public class GenTezProcContext implement
   // remember which reducesinks we've already connected
   public final Set<ReduceSinkOperator> connectedReduceSinks;
 
+  // remember the event operators we've seen
+  public final Set<AppMasterEventOperator> eventOperatorSet;
+
+  // remember the event operators we've abandoned.
+  public final Set<AppMasterEventOperator> abandonedEventOperatorSet;
+
+  // remember the connections between ts and event
+  public final Map<TableScanOperator, List<AppMasterEventOperator>> tsToEventMap;
+
   @SuppressWarnings("unchecked")
   public GenTezProcContext(HiveConf conf, ParseContext parseContext,
       List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -165,6 +173,9 @@ public class GenTezProcContext implement
     this.linkedFileSinks = new LinkedHashMap<Path, List<FileSinkDesc>>();
     this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
     this.connectedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
+    this.eventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
+    this.abandonedEventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
+    this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>();
 
     rootTasks.add(currentTask);
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Mon Sep  8 04:38:17 2014
@@ -20,38 +20,43 @@ package org.apache.hadoop.hive.ql.parse;
 
 import java.util.ArrayList;
 import java.util.Deque;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.LinkedList;
-import java.util.Map;
+import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
 
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+
 /**
  * GenTezUtils is a collection of shared helper methods to produce
  * TezWork
@@ -119,12 +124,12 @@ public class GenTezUtils {
       int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
 
       // min we allow tez to pick
-      int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers() 
+      int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers()
         * minPartitionFactor));
       minPartition = (minPartition > maxReducers) ? maxReducers : minPartition;
 
       // max we allow tez to pick
-      int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); 
+      int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor);
       maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition;
 
       reduceWork.setMinReduceTasks(minPartition);
@@ -177,10 +182,19 @@ public class GenTezUtils {
 
     // map work starts with table scan operators
     assert root instanceof TableScanOperator;
-    String alias = ((TableScanOperator)root).getConf().getAlias();
+    TableScanOperator ts = (TableScanOperator) root;
+
+    String alias = ts.getConf().getAlias();
 
     setupMapWork(mapWork, context, partitions, root, alias);
 
+    if (context.parseContext != null
+        && context.parseContext.getTopToTable() != null
+        && context.parseContext.getTopToTable().containsKey(ts)
+        && context.parseContext.getTopToTable().get(ts).isDummyTable()) {
+      mapWork.setDummyTableScan(true);
+    }
+
     // add new item to the tez work
     tezWork.add(mapWork);
 
@@ -201,18 +215,20 @@ public class GenTezUtils {
       BaseWork work)
     throws SemanticException {
 
-    Set<Operator<?>> roots = work.getAllRootOperators();
+    List<Operator<?>> roots = new ArrayList<Operator<?>>();
+    roots.addAll(work.getAllRootOperators());
     if (work.getDummyOps() != null) {
       roots.addAll(work.getDummyOps());
     }
+    roots.addAll(context.eventOperatorSet);
 
     // need to clone the plan.
-    Set<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
+    List<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
 
     // we're cloning the operator plan but we're retaining the original work. That means
     // that root operators have to be replaced with the cloned ops. The replacement map
     // tells you what that mapping is.
-    Map<Operator<?>, Operator<?>> replacementMap = new HashMap<Operator<?>, Operator<?>>();
+    BiMap<Operator<?>, Operator<?>> replacementMap = HashBiMap.create();
 
     // there's some special handling for dummyOps required. Mapjoins won't be properly
     // initialized if their dummy parents aren't initialized. Since we cloned the plan
@@ -222,11 +238,35 @@ public class GenTezUtils {
     Iterator<Operator<?>> it = newRoots.iterator();
     for (Operator<?> orig: roots) {
       Operator<?> newRoot = it.next();
+
+      replacementMap.put(orig, newRoot);
+
       if (newRoot instanceof HashTableDummyOperator) {
-        dummyOps.add((HashTableDummyOperator)newRoot);
+        // dummy ops need to be updated to the cloned ones.
+        dummyOps.add((HashTableDummyOperator) newRoot);
+        it.remove();
+      } else if (newRoot instanceof AppMasterEventOperator) {
+        // event operators point to table scan operators. When cloning these we
+        // need to restore the original scan.
+        if (newRoot.getConf() instanceof DynamicPruningEventDesc) {
+          TableScanOperator ts = ((DynamicPruningEventDesc) orig.getConf()).getTableScan();
+          if (ts == null) {
+            throw new AssertionError("No table scan associated with dynamic event pruning. " + orig);
+          }
+          ((DynamicPruningEventDesc) newRoot.getConf()).setTableScan(ts);
+        }
         it.remove();
       } else {
-        replacementMap.put(orig,newRoot);
+        if (newRoot instanceof TableScanOperator) {
+          if (context.tsToEventMap.containsKey(orig)) {
+            // we need to update event operators with the cloned table scan
+            for (AppMasterEventOperator event : context.tsToEventMap.get(orig)) {
+              ((DynamicPruningEventDesc) event.getConf()).setTableScan((TableScanOperator) newRoot);
+            }
+          }
+        }
+        context.rootToWorkMap.remove(orig);
+        context.rootToWorkMap.put(newRoot, work);
       }
     }
 
@@ -263,6 +303,15 @@ public class GenTezUtils {
         desc.setLinkedFileSinkDesc(linked);
       }
 
+      if (current instanceof AppMasterEventOperator) {
+        // remember for additional processing later
+        context.eventOperatorSet.add((AppMasterEventOperator) current);
+
+        // mark the original as abandoned. Don't need it anymore.
+        context.abandonedEventOperatorSet.add((AppMasterEventOperator) replacementMap.inverse()
+            .get(current));
+      }
+
       if (current instanceof UnionOperator) {
         Operator<?> parent = null;
         int count = 0;
@@ -328,4 +377,87 @@ public class GenTezUtils {
       }
     }
   }
+
+  /**
+   * processAppMasterEvent sets up the event descriptor and the MapWork.
+   *
+   * @param procCtx
+   * @param event
+   */
+  public void processAppMasterEvent(GenTezProcContext procCtx, AppMasterEventOperator event) {
+
+    if (procCtx.abandonedEventOperatorSet.contains(event)) {
+      // don't need this anymore
+      return;
+    }
+
+    DynamicPruningEventDesc eventDesc = (DynamicPruningEventDesc)event.getConf();
+    TableScanOperator ts = eventDesc.getTableScan();
+
+    MapWork work = (MapWork) procCtx.rootToWorkMap.get(ts);
+    if (work == null) {
+      throw new AssertionError("No work found for tablescan " + ts);
+    }
+
+    BaseWork enclosingWork = getEnclosingWork(event, procCtx);
+    if (enclosingWork == null) {
+      throw new AssertionError("Cannot find work for operator" + event);
+    }
+    String sourceName = enclosingWork.getName();
+
+    // store the vertex name in the operator pipeline
+    eventDesc.setVertexName(work.getName());
+    eventDesc.setInputName(work.getAliases().get(0));
+
+    // store table descriptor in map-work
+    if (!work.getEventSourceTableDescMap().containsKey(sourceName)) {
+      work.getEventSourceTableDescMap().put(sourceName, new LinkedList<TableDesc>());
+    }
+    List<TableDesc> tables = work.getEventSourceTableDescMap().get(sourceName);
+    tables.add(event.getConf().getTable());
+
+    // store column name in map-work
+    if (!work.getEventSourceColumnNameMap().containsKey(sourceName)) {
+      work.getEventSourceColumnNameMap().put(sourceName, new LinkedList<String>());
+    }
+    List<String> columns = work.getEventSourceColumnNameMap().get(sourceName);
+    columns.add(eventDesc.getTargetColumnName());
+
+    // store partition key expr in map-work
+    if (!work.getEventSourcePartKeyExprMap().containsKey(sourceName)) {
+      work.getEventSourcePartKeyExprMap().put(sourceName, new LinkedList<ExprNodeDesc>());
+    }
+    List<ExprNodeDesc> keys = work.getEventSourcePartKeyExprMap().get(sourceName);
+    keys.add(eventDesc.getPartKey());
+
+  }
+
+  /**
+   * getEncosingWork finds the BaseWork any given operator belongs to.
+   */
+  public BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) {
+    List<Operator<?>> ops = new ArrayList<Operator<?>>();
+    findRoots(op, ops);
+    for (Operator<?> r : ops) {
+      BaseWork work = procCtx.rootToWorkMap.get(r);
+      if (work != null) {
+        return work;
+      }
+    }
+    return null;
+  }
+
+  /*
+   * findRoots returns all root operators (in ops) that result in operator op
+   */
+  private void findRoots(Operator<?> op, List<Operator<?>> ops) {
+    List<Operator<?>> parents = op.getParentOperators();
+    if (parents == null || parents.isEmpty()) {
+      ops.add(op);
+      return;
+    }
+    for (Operator<?> p : parents) {
+      findRoots(p, ops);
+    }
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java Mon Sep  8 04:38:17 2014
@@ -445,7 +445,7 @@ public class ImportSemanticAnalyzer exte
        * substitute OutputFormat name based on HiveFileFormatUtils.outputFormatSubstituteMap
        */
       try {
-        Class<?> origin = Class.forName(importedofc, true, JavaUtils.getClassLoader());
+        Class<?> origin = Class.forName(importedofc, true, Utilities.getSessionSpecifiedClassLoader());
         Class<? extends HiveOutputFormat> replaced = HiveFileFormatUtils
             .getOutputFormatSubstitute(origin,false);
         if (replaced == null) {