You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/18 01:48:46 UTC

svn commit: r1640263 [6/12] - in /hive/branches/spark: ./ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/ accu...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Nov 18 00:48:40 2014
@@ -1135,7 +1135,9 @@ public class FileSinkOperator extends Te
       String postfix=null;
       if (taskIndependent) {
         // key = "database.table/SP/DP/"LB/
-        prefix = conf.getTableInfo().getTableName();
+        // Hive store lowercase table name in metastore, and Counters is character case sensitive, so we
+        // use lowercase table name as prefix here, as StatsTask get table name from metastore to fetch counter.
+        prefix = conf.getTableInfo().getTableName().toLowerCase();
       } else {
         // key = "prefix/SP/DP/"LB/taskID/
         prefix = conf.getStatsAggPrefix();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java Tue Nov 18 00:48:40 2014
@@ -33,6 +33,6 @@ public interface HashTableLoader {
 
   void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp);
 
-  void load(MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes)
-      throws HiveException;
+  void load(MapJoinTableContainer[] mapJoinTables,
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException;
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Tue Nov 18 00:48:40 2014
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.HashTableLoaderFactory;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
@@ -187,7 +188,9 @@ public class MapJoinOperator extends Abs
     }
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
     loader.init(getExecContext(), hconf, this);
-    loader.load(mapJoinTables, mapJoinTableSerdes);
+    long memUsage = (long)(MapJoinMemoryExhaustionHandler.getMaxHeapSize()
+        * conf.getHashTableMemoryUsage());
+    loader.load(mapJoinTables, mapJoinTableSerdes, memUsage);
     if (!conf.isBucketMapJoin()) {
       /*
        * The issue with caching in case of bucket map join is that different tasks

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.IOException;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java Tue Nov 18 00:48:40 2014
@@ -55,22 +55,30 @@ public class MapJoinMemoryExhaustionHand
     this.console = console;
     this.maxMemoryUsage = maxMemoryUsage;
     this.memoryMXBean = ManagementFactory.getMemoryMXBean();
-    long maxHeapSize = memoryMXBean.getHeapMemoryUsage().getMax();
+    this.maxHeapSize = getMaxHeapSize(memoryMXBean);
+    percentageNumberFormat = NumberFormat.getInstance();
+    percentageNumberFormat.setMinimumFractionDigits(2);
+    LOG.info("JVM Max Heap Size: " + this.maxHeapSize);
+  }
+
+  public static long getMaxHeapSize() {
+    return getMaxHeapSize(ManagementFactory.getMemoryMXBean());
+  }
+
+  private static long getMaxHeapSize(MemoryMXBean bean) {
+    long maxHeapSize = bean.getHeapMemoryUsage().getMax();
     /*
      * According to the javadoc, getMax() can return -1. In this case
      * default to 200MB. This will probably never actually happen.
      */
     if(maxHeapSize == -1) {
-      this.maxHeapSize = 200L * 1024L * 1024L;
       LOG.warn("MemoryMXBean.getHeapMemoryUsage().getMax() returned -1, " +
           "defaulting maxHeapSize to 200MB");
-    } else {
-      this.maxHeapSize = maxHeapSize;
+      return 200L * 1024L * 1024L;
     }
-    percentageNumberFormat = NumberFormat.getInstance();
-    percentageNumberFormat.setMinimumFractionDigits(2);
-    LOG.info("JVM Max Heap Size: " + this.maxHeapSize);
+    return maxHeapSize;
   }
+
   /**
    * Throws MapJoinMemoryExhaustionException when the JVM has consumed the
    * configured percentage of memory. The arguments are used simply for the error

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java Tue Nov 18 00:48:40 2014
@@ -72,7 +72,7 @@ public class HashTableLoader implements 
   @Override
   public void load(
       MapJoinTableContainer[] mapJoinTables,
-      MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException {
 
     String currentInputPath = context.getCurrentInputPath().toString();
     LOG.info("******* Load from HashTable for input file: " + currentInputPath);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java Tue Nov 18 00:48:40 2014
@@ -149,13 +149,27 @@ public final class BytesBytesMultiHashMa
 
   /** We have 39 bits to store list pointer from the first record; this is size limit */
   final static long MAX_WB_SIZE = ((long)1) << 38;
+  /** 8 Gb of refs is the max capacity if memory limit is not specified. If someone has 100s of
+   * Gbs of memory (this might happen pretty soon) we'd need to string together arrays anyway. */
+  private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024;
 
-  public BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) {
+  public BytesBytesMultiHashMap(int initialCapacity,
+      float loadFactor, int wbSize, long memUsage, int defaultCapacity) {
     if (loadFactor < 0 || loadFactor > 1) {
       throw new AssertionError("Load factor must be between (0, 1].");
     }
+    assert initialCapacity > 0;
     initialCapacity = (Long.bitCount(initialCapacity) == 1)
         ? initialCapacity : nextHighestPowerOfTwo(initialCapacity);
+    // 8 bytes per long in the refs, assume data will be empty. This is just a sanity check.
+    int maxCapacity =  (memUsage <= 0) ? DEFAULT_MAX_CAPACITY
+        : (int)Math.min((long)DEFAULT_MAX_CAPACITY, memUsage / 8);
+    if (maxCapacity < initialCapacity || initialCapacity <= 0) {
+      // Either initialCapacity is too large, or nextHighestPowerOfTwo overflows
+      initialCapacity = (Long.bitCount(maxCapacity) == 1)
+          ? maxCapacity : nextLowestPowerOfTwo(maxCapacity);
+    }
+
     validateCapacity(initialCapacity);
     startingHashBitCount = 63 - Long.numberOfLeadingZeros(initialCapacity);
     this.loadFactor = loadFactor;
@@ -164,6 +178,11 @@ public final class BytesBytesMultiHashMa
     resizeThreshold = (int)(initialCapacity * this.loadFactor);
   }
 
+  @VisibleForTesting
+  BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) {
+    this(initialCapacity, loadFactor, wbSize, -1, 100000);
+  }
+
   /** The source of keys and values to put into hashtable; avoids byte copying. */
   public static interface KvSource {
     /** Write key into output. */
@@ -644,6 +663,10 @@ public final class BytesBytesMultiHashMa
     return Integer.highestOneBit(v) << 1;
   }
 
+  private static int nextLowestPowerOfTwo(int v) {
+    return Integer.highestOneBit(v);
+  }
+
   @VisibleForTesting
   int getCapacity() {
     return refs.length;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.ql.exec.persistence;
 
 
@@ -60,17 +78,20 @@ public class MapJoinBytesTableContainer 
   private final List<Object> EMPTY_LIST = new ArrayList<Object>(0);
 
   public MapJoinBytesTableContainer(Configuration hconf,
-      MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException {
+      MapJoinObjectSerDeContext valCtx, long keyCount, long memUsage) throws SerDeException {
     this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT),
         HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD),
         HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR),
-        HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), valCtx, keyCount);
+        HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE),
+        valCtx, keyCount, memUsage);
   }
 
   private MapJoinBytesTableContainer(float keyCountAdj, int threshold, float loadFactor,
-      int wbSize, MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException {
-    threshold = HashMapWrapper.calculateTableSize(keyCountAdj, threshold, loadFactor, keyCount);
-    hashMap = new BytesBytesMultiHashMap(threshold, loadFactor, wbSize);
+      int wbSize, MapJoinObjectSerDeContext valCtx, long keyCount, long memUsage)
+          throws SerDeException {
+    int newThreshold = HashMapWrapper.calculateTableSize(
+        keyCountAdj, threshold, loadFactor, keyCount);
+    hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage, threshold);
   }
 
   private LazyBinaryStructObjectInspector createInternalOi(

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Tue Nov 18 00:48:40 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
@@ -69,7 +70,7 @@ public class HashTableLoader implements 
   @Override
   public void load(
       MapJoinTableContainer[] mapJoinTables,
-      MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException {
 
     TezContext tezContext = (TezContext) MapredContext.get();
     Map<Integer, String> parentToInput = desc.getParentToInput();
@@ -106,7 +107,7 @@ public class HashTableLoader implements 
         Long keyCountObj = parentKeyCounts.get(pos);
         long keyCount = (keyCountObj == null) ? -1 : keyCountObj.longValue();
         MapJoinTableContainer tableContainer = useOptimizedTables
-            ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount)
+            ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage)
             : new HashMapWrapper(hconf, keyCount);
 
         while (kvReader.next()) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Tue Nov 18 00:48:40 2014
@@ -45,6 +45,8 @@ import org.apache.tez.dag.api.client.Sta
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.fusesource.jansi.Ansi;
 
+import com.google.common.base.Preconditions;
+
 import java.io.IOException;
 import java.io.PrintStream;
 import java.text.DecimalFormat;
@@ -132,6 +134,11 @@ public class TezJobMonitor {
     });
   }
 
+  public static void initShutdownHook() {
+    Preconditions.checkNotNull(shutdownList,
+        "Shutdown hook was not properly initialized");
+  }
+
   public TezJobMonitor() {
     console = SessionState.getConsole();
     secondsFormat = new DecimalFormat("#0.00");
@@ -290,6 +297,7 @@ public class TezJobMonitor {
             break;
           case INITING:
             console.printInfo("Status: Initializing");
+            startTime = System.currentTimeMillis();
             break;
           case RUNNING:
             if (!running) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Tue Nov 18 00:48:40 2014
@@ -187,6 +187,7 @@ public class TezSessionState {
     LOG.info("Opening new Tez Session (id: " + sessionId
         + ", scratch dir: " + tezScratchDir + ")");
 
+    TezJobMonitor.initShutdownHook();
     session.start();
 
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Tue Nov 18 00:48:40 2014
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.Context;
@@ -170,7 +171,8 @@ public class TezTask extends Task<TezWor
       counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
       TezSessionPoolManager.getInstance().returnSession(session);
 
-      if (LOG.isInfoEnabled() && counters != null) {
+      if (LOG.isInfoEnabled() && counters != null
+          && conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY)) {
         for (CounterGroup group: counters) {
           LOG.info(group.getDisplayName() +":");
           for (TezCounter counter: group) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java Tue Nov 18 00:48:40 2014
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -89,6 +90,8 @@ public class ATSHook implements ExecuteW
   @Override
   public void run(final HookContext hookContext) throws Exception {
     final long currentTime = System.currentTimeMillis();
+    final HiveConf conf = new HiveConf(hookContext.getConf());
+
     executor.submit(new Runnable() {
         @Override
         public void run() {
@@ -110,19 +113,19 @@ public class ATSHook implements ExecuteW
             switch(hookContext.getHookType()) {
             case PRE_EXEC_HOOK:
               ExplainTask explain = new ExplainTask();
-              explain.initialize(hookContext.getConf(), plan, null);
+              explain.initialize(conf, plan, null);
               String query = plan.getQueryStr();
               List<Task<?>> rootTasks = plan.getRootTasks();
               JSONObject explainPlan = explain.getJSONPlan(null, null, rootTasks,
                    plan.getFetchTask(), true, false, false);
-              fireAndForget(hookContext.getConf(), createPreHookEvent(queryId, query,
+              fireAndForget(conf, createPreHookEvent(queryId, query,
                    explainPlan, queryStartTime, user, numMrJobs, numTezJobs));
               break;
             case POST_EXEC_HOOK:
-              fireAndForget(hookContext.getConf(), createPostHookEvent(queryId, currentTime, user, true));
+              fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, true));
               break;
             case ON_FAILURE_HOOK:
-              fireAndForget(hookContext.getConf(), createPostHookEvent(queryId, currentTime, user, false));
+              fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, false));
               break;
             default:
               //ignore

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java Tue Nov 18 00:48:40 2014
@@ -394,7 +394,8 @@ class ColumnStatisticsImpl implements Co
       } else if (str.minimum != null) {
         if (minimum.compareTo(str.minimum) > 0) {
           minimum = new Text(str.getMinimum());
-        } else if (maximum.compareTo(str.maximum) < 0) {
+        }
+        if (maximum.compareTo(str.maximum) < 0) {
           maximum = new Text(str.getMaximum());
         }
       }
@@ -563,7 +564,8 @@ class ColumnStatisticsImpl implements Co
       } else if (dec.minimum != null) {
         if (minimum.compareTo(dec.minimum) > 0) {
           minimum = dec.minimum;
-        } else if (maximum.compareTo(dec.maximum) < 0) {
+        }
+        if (maximum.compareTo(dec.maximum) < 0) {
           maximum = dec.maximum;
         }
         if (sum == null || dec.sum == null) {
@@ -671,7 +673,8 @@ class ColumnStatisticsImpl implements Co
       } else if (dateStats.minimum != null) {
         if (minimum > dateStats.minimum) {
           minimum = dateStats.minimum;
-        } else if (maximum < dateStats.maximum) {
+        }
+        if (maximum < dateStats.maximum) {
           maximum = dateStats.maximum;
         }
       }
@@ -767,7 +770,8 @@ class ColumnStatisticsImpl implements Co
       } else if (timestampStats.minimum != null) {
         if (minimum > timestampStats.minimum) {
           minimum = timestampStats.minimum;
-        } else if (maximum < timestampStats.maximum) {
+        }
+        if (maximum < timestampStats.maximum) {
           maximum = timestampStats.maximum;
         }
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java Tue Nov 18 00:48:40 2014
@@ -65,6 +65,8 @@ public final class FileDump {
       System.out.println("Structure for " + filename);
       Path path = new Path(filename);
       Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+      System.out.println("File Version: " + reader.getFileVersion().getName() +
+                         " with " + reader.getWriterVersion());
       RecordReaderImpl rows = (RecordReaderImpl) reader.rows();
       System.out.println("Rows: " + reader.getNumberOfRows());
       System.out.println("Compression: " + reader.getCompression());

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Tue Nov 18 00:48:40 2014
@@ -97,6 +97,26 @@ public final class OrcFile {
     }
   }
 
+  /**
+   * Records the version of the writer in terms of which bugs have been fixed.
+   * For bugs in the writer, but the old readers already read the new data
+   * correctly, bump this version instead of the Version.
+   */
+  public static enum WriterVersion {
+    ORIGINAL(0),
+      HIVE_8732(1); // corrupted stripe/file maximum column statistics
+
+    private final int id;
+
+    public int getId() {
+      return id;
+    }
+
+    private WriterVersion(int id) {
+      this.id = id;
+    }
+  }
+
   public static enum EncodingStrategy {
     SPEED, COMPRESSION;
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Tue Nov 18 00:48:40 2014
@@ -630,6 +630,7 @@ public class OrcInputFormat  implements 
     private final boolean isOriginal;
     private final List<Long> deltas;
     private final boolean hasBase;
+    private OrcFile.WriterVersion writerVersion;
 
     SplitGenerator(Context context, FileSystem fs,
                    FileStatus file, FileInfo fileInfo,
@@ -775,7 +776,9 @@ public class OrcInputFormat  implements 
           Reader.Options options = new Reader.Options();
           setIncludedColumns(options, types, context.conf, isOriginal);
           setSearchArgument(options, types, context.conf, isOriginal);
-          if (options.getSearchArgument() != null) {
+          // only do split pruning if HIVE-8732 has been fixed in the writer
+          if (options.getSearchArgument() != null &&
+              writerVersion != OrcFile.WriterVersion.ORIGINAL) {
             SearchArgument sarg = options.getSearchArgument();
             List<PredicateLeaf> sargLeaves = sarg.getLeaves();
             List<StripeStatistics> stripeStats = metadata.getStripeStatistics();
@@ -866,6 +869,7 @@ public class OrcInputFormat  implements 
           fileMetaInfo = fileInfo.fileMetaInfo;
           metadata = fileInfo.metadata;
           types = fileInfo.types;
+          writerVersion = fileInfo.writerVersion;
           // For multiple runs, in case sendSplitsInFooter changes
           if (fileMetaInfo == null && context.footerInSplits) {
             orcReader = OrcFile.createReader(file.getPath(),
@@ -873,6 +877,7 @@ public class OrcInputFormat  implements 
             fileInfo.fileMetaInfo = ((ReaderImpl) orcReader).getFileMetaInfo();
             fileInfo.metadata = orcReader.getMetadata();
             fileInfo.types = orcReader.getTypes();
+            fileInfo.writerVersion = orcReader.getWriterVersion();
           }
         } else {
           orcReader = OrcFile.createReader(file.getPath(),
@@ -880,13 +885,14 @@ public class OrcInputFormat  implements 
           stripes = orcReader.getStripes();
           metadata = orcReader.getMetadata();
           types = orcReader.getTypes();
+          writerVersion = orcReader.getWriterVersion();
           fileMetaInfo = context.footerInSplits ?
               ((ReaderImpl) orcReader).getFileMetaInfo() : null;
           if (context.cacheStripeDetails) {
             // Populate into cache.
             Context.footerCache.put(file.getPath(),
                 new FileInfo(file.getModificationTime(), file.getLen(), stripes,
-                    metadata, types, fileMetaInfo));
+                    metadata, types, fileMetaInfo, writerVersion));
           }
         }
       } catch (Throwable th) {
@@ -981,18 +987,21 @@ public class OrcInputFormat  implements 
     ReaderImpl.FileMetaInfo fileMetaInfo;
     Metadata metadata;
     List<OrcProto.Type> types;
+    private OrcFile.WriterVersion writerVersion;
 
 
     FileInfo(long modificationTime, long size,
              List<StripeInformation> stripeInfos,
              Metadata metadata, List<OrcProto.Type> types,
-             ReaderImpl.FileMetaInfo fileMetaInfo) {
+             ReaderImpl.FileMetaInfo fileMetaInfo,
+             OrcFile.WriterVersion writerVersion) {
       this.modificationTime = modificationTime;
       this.size = size;
       this.stripeInfos = stripeInfos;
       this.fileMetaInfo = fileMetaInfo;
       this.metadata = metadata;
       this.types = types;
+      this.writerVersion = writerVersion;
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java Tue Nov 18 00:48:40 2014
@@ -38,6 +38,7 @@ public class OrcNewSplit extends FileSpl
   private boolean isOriginal;
   private boolean hasBase;
   private final List<Long> deltas = new ArrayList<Long>();
+  private OrcFile.WriterVersion writerVersion;
 
   protected OrcNewSplit(){
     //The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it.
@@ -83,6 +84,7 @@ public class OrcNewSplit extends FileSpl
       WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
       out.write(footerBuff.array(), footerBuff.position(),
           footerBuff.limit() - footerBuff.position());
+      WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId());
     }
   }
 
@@ -111,9 +113,11 @@ public class OrcNewSplit extends FileSpl
       int footerBuffSize = WritableUtils.readVInt(in);
       ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
       in.readFully(footerBuff.array(), 0, footerBuffSize);
+      OrcFile.WriterVersion writerVersion =
+          ReaderImpl.getWriterVersion(WritableUtils.readVInt(in));
 
       fileMetaInfo = new ReaderImpl.FileMetaInfo(compressionType, bufferSize,
-          metadataSize, footerBuff);
+          metadataSize, footerBuff, writerVersion);
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java Tue Nov 18 00:48:40 2014
@@ -42,6 +42,7 @@ public class OrcSplit extends FileSplit 
   private boolean isOriginal;
   private boolean hasBase;
   private final List<Long> deltas = new ArrayList<Long>();
+  private OrcFile.WriterVersion writerVersion;
 
   static final int BASE_FLAG = 4;
   static final int ORIGINAL_FLAG = 2;
@@ -92,6 +93,7 @@ public class OrcSplit extends FileSplit 
       WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
       out.write(footerBuff.array(), footerBuff.position(),
           footerBuff.limit() - footerBuff.position());
+      WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId());
     }
   }
 
@@ -120,9 +122,11 @@ public class OrcSplit extends FileSplit 
       int footerBuffSize = WritableUtils.readVInt(in);
       ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
       in.readFully(footerBuff.array(), 0, footerBuffSize);
+      OrcFile.WriterVersion writerVersion =
+          ReaderImpl.getWriterVersion(WritableUtils.readVInt(in));
 
       fileMetaInfo = new ReaderImpl.FileMetaInfo(compressionType, bufferSize,
-          metadataSize, footerBuff);
+          metadataSize, footerBuff, writerVersion);
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Tue Nov 18 00:48:40 2014
@@ -129,6 +129,16 @@ public interface Reader {
   List<OrcProto.Type> getTypes();
 
   /**
+   * Get the file format version.
+   */
+  OrcFile.Version getFileVersion();
+
+  /**
+   * Get the version of the writer of this file.
+   */
+  OrcFile.WriterVersion getWriterVersion();
+
+  /**
    * Options for creating a RecordReader.
    */
   public static class Options {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Tue Nov 18 00:48:40 2014
@@ -62,6 +62,7 @@ final class ReaderImpl implements Reader
   private long deserializedSize = -1;
   private final Configuration conf;
   private final List<Integer> versionList;
+  private final OrcFile.WriterVersion writerVersion;
 
   //serialized footer - Keeping this around for use by getFileMetaInfo()
   // will help avoid cpu cycles spend in deserializing at cost of increased
@@ -182,6 +183,22 @@ final class ReaderImpl implements Reader
   }
 
   @Override
+  public OrcFile.Version getFileVersion() {
+    for (OrcFile.Version version: OrcFile.Version.values()) {
+      if (version.getMajor() == versionList.get(0) &&
+          version.getMinor() == versionList.get(1)) {
+        return version;
+      }
+    }
+    return OrcFile.Version.V_0_11;
+  }
+
+  @Override
+  public OrcFile.WriterVersion getWriterVersion() {
+    return writerVersion;
+  }
+
+  @Override
   public int getRowIndexStride() {
     return footer.getRowIndexStride();
   }
@@ -309,8 +326,22 @@ final class ReaderImpl implements Reader
     this.footer = rInfo.footer;
     this.inspector = rInfo.inspector;
     this.versionList = footerMetaData.versionList;
+    this.writerVersion = footerMetaData.writerVersion;
   }
 
+  /**
+   * Get the WriterVersion based on the ORC file postscript.
+   * @param writerVersion the integer writer version
+   * @return
+   */
+  static OrcFile.WriterVersion getWriterVersion(int writerVersion) {
+    for(OrcFile.WriterVersion version: OrcFile.WriterVersion.values()) {
+      if (version.getId() == writerVersion) {
+        return version;
+      }
+    }
+    return OrcFile.WriterVersion.ORIGINAL;
+  }
 
   private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs,
                                                         Path path,
@@ -346,6 +377,12 @@ final class ReaderImpl implements Reader
 
     int footerSize = (int) ps.getFooterLength();
     int metadataSize = (int) ps.getMetadataLength();
+    OrcFile.WriterVersion writerVersion;
+    if (ps.hasWriterVersion()) {
+      writerVersion =  getWriterVersion(ps.getWriterVersion());
+    } else {
+      writerVersion = OrcFile.WriterVersion.ORIGINAL;
+    }
 
     //check compression codec
     switch (ps.getCompression()) {
@@ -391,7 +428,8 @@ final class ReaderImpl implements Reader
         (int) ps.getCompressionBlockSize(),
         (int) ps.getMetadataLength(),
         buffer,
-        ps.getVersionList()
+        ps.getVersionList(),
+        writerVersion
         );
   }
 
@@ -451,25 +489,29 @@ final class ReaderImpl implements Reader
     final int metadataSize;
     final ByteBuffer footerBuffer;
     final List<Integer> versionList;
+    final OrcFile.WriterVersion writerVersion;
 
     FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
-        ByteBuffer footerBuffer) {
-      this(compressionType, bufferSize, metadataSize, footerBuffer, null);
+        ByteBuffer footerBuffer, OrcFile.WriterVersion writerVersion) {
+      this(compressionType, bufferSize, metadataSize, footerBuffer, null,
+          writerVersion);
     }
 
     FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
-                 ByteBuffer footerBuffer, List<Integer> versionList){
+                 ByteBuffer footerBuffer, List<Integer> versionList,
+                 OrcFile.WriterVersion writerVersion){
       this.compressionType = compressionType;
       this.bufferSize = bufferSize;
       this.metadataSize = metadataSize;
       this.footerBuffer = footerBuffer;
       this.versionList = versionList;
+      this.writerVersion = writerVersion;
     }
   }
 
   public FileMetaInfo getFileMetaInfo(){
     return new FileMetaInfo(compressionKind.toString(), bufferSize,
-        metadataSize, footerByteBuffer, versionList);
+        metadataSize, footerByteBuffer, versionList, writerVersion);
   }
 
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Nov 18 00:48:40 2014
@@ -2364,20 +2364,21 @@ class RecordReaderImpl implements Record
                                       PredicateLeaf predicate) {
     ColumnStatistics cs = ColumnStatisticsImpl.deserialize(index);
     Object minValue = getMin(cs);
+    Object maxValue = getMax(cs);
+    return evaluatePredicateRange(predicate, minValue, maxValue);
+  }
+
+  static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
+      Object max) {
     // if we didn't have any values, everything must have been null
-    if (minValue == null) {
+    if (min == null) {
       if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
         return TruthValue.YES;
       } else {
         return TruthValue.NULL;
       }
     }
-    Object maxValue = getMax(cs);
-    return evaluatePredicateRange(predicate, minValue, maxValue);
-  }
 
-  static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
-      Object max) {
     Location loc;
     try {
       // Predicate object and stats object can be one of the following base types

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TimestampColumnStatistics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TimestampColumnStatistics.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TimestampColumnStatistics.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TimestampColumnStatistics.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.sql.Timestamp;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Tue Nov 18 00:48:40 2014
@@ -2230,7 +2230,8 @@ class WriterImpl implements Writer, Memo
         .setMetadataLength(metadataLength)
         .setMagic(OrcFile.MAGIC)
         .addVersion(version.getMajor())
-        .addVersion(version.getMinor());
+        .addVersion(version.getMinor())
+        .setWriterVersion(OrcFile.WriterVersion.HIVE_8732.getId());
     if (compress != CompressionKind.NONE) {
       builder.setCompressionBlockSize(bufferSize);
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java Tue Nov 18 00:48:40 2014
@@ -63,6 +63,8 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.ParquetWriter;
 import parquet.io.api.Binary;
 
 /**
@@ -70,13 +72,18 @@ import parquet.io.api.Binary;
  * A ParquetHiveSerDe for Hive (with the deprecated package mapred)
  *
  */
-@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES})
+@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES,
+        ParquetOutputFormat.COMPRESSION})
 public class ParquetHiveSerDe extends AbstractSerDe {
   public static final Text MAP_KEY = new Text("key");
   public static final Text MAP_VALUE = new Text("value");
   public static final Text MAP = new Text("map");
   public static final Text ARRAY = new Text("bag");
 
+  // default compression type for parquet output format
+  private static final String DEFAULTCOMPRESSION =
+          ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME.name();
+
   // Map precision to the number bytes needed for binary conversion.
   public static final int PRECISION_TO_BYTE_COUNT[] = new int[38];
   static {
@@ -99,6 +106,7 @@ public class ParquetHiveSerDe extends Ab
   private LAST_OPERATION status;
   private long serializedSize;
   private long deserializedSize;
+  private String compressionType;
 
   @Override
   public final void initialize(final Configuration conf, final Properties tbl) throws SerDeException {
@@ -110,6 +118,9 @@ public class ParquetHiveSerDe extends Ab
     final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
     final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
 
+    // Get compression properties
+    compressionType = tbl.getProperty(ParquetOutputFormat.COMPRESSION, DEFAULTCOMPRESSION);
+
     if (columnNameProperty.length() == 0) {
       columnNames = new ArrayList<String>();
     } else {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java Tue Nov 18 00:48:40 2014
@@ -18,10 +18,12 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -55,21 +57,13 @@ public class ParquetRecordWriterWrapper 
       }
       taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
 
+      LOG.info("initialize serde with table properties.");
+      initializeSerProperties(taskContext, tableProperties);
+
       LOG.info("creating real writer to write at " + name);
 
-      String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION);
-      if (compressionName != null && !compressionName.isEmpty()) {
-        //get override compression properties via "tblproperties" clause if it is set
-        LOG.debug("get override compression properties via tblproperties");
-
-        ContextUtil.getConfiguration(taskContext);
-        CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName);
-        realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(jobConf,
-                new Path(name), codecName);
-      } else {
-        realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext,
-                new Path(name));
-      }
+      realWriter =
+              ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name));
 
       LOG.info("real writer: " + realWriter);
     } catch (final InterruptedException e) {
@@ -77,6 +71,31 @@ public class ParquetRecordWriterWrapper 
     }
   }
 
+  private void initializeSerProperties(JobContext job, Properties tableProperties) {
+    String blockSize = tableProperties.getProperty(ParquetOutputFormat.BLOCK_SIZE);
+    Configuration conf = ContextUtil.getConfiguration(job);
+    if (blockSize != null && !blockSize.isEmpty()) {
+      LOG.debug("get override parquet.block.size property via tblproperties");
+      conf.setInt(ParquetOutputFormat.BLOCK_SIZE, Integer.valueOf(blockSize));
+    }
+
+    String enableDictionaryPage =
+      tableProperties.getProperty(ParquetOutputFormat.ENABLE_DICTIONARY);
+    if (enableDictionaryPage != null && !enableDictionaryPage.isEmpty()) {
+      LOG.debug("get override parquet.enable.dictionary property via tblproperties");
+      conf.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY,
+        Boolean.valueOf(enableDictionaryPage));
+    }
+
+    String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION);
+    if (compressionName != null && !compressionName.isEmpty()) {
+      //get override compression properties via "tblproperties" clause if it is set
+      LOG.debug("get override compression properties via tblproperties");
+      CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName);
+      conf.set(ParquetOutputFormat.COMPRESSION, codecName.name());
+    }
+  }
+
   @Override
   public void close(final Reporter reporter) throws IOException {
     try {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.ql.io.sarg;
 
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.ql.lib;
 
 import org.apache.hadoop.hive.ql.exec.Operator;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Tue Nov 18 00:48:40 2014
@@ -92,6 +92,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -748,8 +749,9 @@ public class Hive {
         throw new HiveException("Table name " + indexTblName + " already exists. Choose another name.");
       }
 
-      org.apache.hadoop.hive.metastore.api.StorageDescriptor storageDescriptor = baseTbl.getSd().deepCopy();
-      SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo();
+      SerDeInfo serdeInfo = new SerDeInfo();
+      serdeInfo.setName(indexTblName);
+
       if(serde != null) {
         serdeInfo.setSerializationLib(serde);
       } else {
@@ -762,6 +764,7 @@ public class Hive {
         }
       }
 
+      serdeInfo.setParameters(new HashMap<String, String>());
       if (fieldDelim != null) {
         serdeInfo.getParameters().put(FIELD_DELIM, fieldDelim);
         serdeInfo.getParameters().put(SERIALIZATION_FORMAT, fieldDelim);
@@ -788,18 +791,8 @@ public class Hive {
         }
       }
 
-      storageDescriptor.setLocation(null);
-      if (location != null) {
-        storageDescriptor.setLocation(location);
-      }
-      storageDescriptor.setInputFormat(inputFormat);
-      storageDescriptor.setOutputFormat(outputFormat);
-
-      Map<String, String> params = new HashMap<String,String>();
-
       List<FieldSchema> indexTblCols = new ArrayList<FieldSchema>();
       List<Order> sortCols = new ArrayList<Order>();
-      storageDescriptor.setBucketCols(null);
       int k = 0;
       Table metaBaseTbl = new Table(baseTbl);
       for (int i = 0; i < metaBaseTbl.getCols().size(); i++) {
@@ -815,9 +808,6 @@ public class Hive {
             "Check the index columns, they should appear in the table being indexed.");
       }
 
-      storageDescriptor.setCols(indexTblCols);
-      storageDescriptor.setSortCols(sortCols);
-
       int time = (int) (System.currentTimeMillis() / 1000);
       org.apache.hadoop.hive.metastore.api.Table tt = null;
       HiveIndexHandler indexHandler = HiveUtils.getIndexHandler(this.getConf(), indexHandlerClass);
@@ -851,8 +841,21 @@ public class Hive {
 
       String tdname = Utilities.getDatabaseName(tableName);
       String ttname = Utilities.getTableName(tableName);
+
+      StorageDescriptor indexSd = new StorageDescriptor(
+          indexTblCols,
+          location,
+          inputFormat,
+          outputFormat,
+          false/*compressed - not used*/,
+          -1/*numBuckets - default is -1 when the table has no buckets*/,
+          serdeInfo,
+          null/*bucketCols*/,
+          sortCols,
+          null/*parameters*/);
+
       Index indexDesc = new Index(indexName, indexHandlerClass, tdname, ttname, time, time, indexTblName,
-          storageDescriptor, params, deferredRebuild);
+          indexSd, new HashMap<String,String>(), deferredRebuild);
       if (indexComment != null) {
         indexDesc.getParameters().put("comment", indexComment);
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.ql.metadata;
 
 import java.io.IOException;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Tue Nov 18 00:48:40 2014
@@ -240,6 +240,9 @@ public class ConvertJoinMapJoin implemen
           new MapJoinDesc(null, null, joinDesc.getExprs(), null, null,
               joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
               joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
+      mapJoinDesc.setNullSafes(joinDesc.getNullSafes());
+      mapJoinDesc.setFilterMap(joinDesc.getFilterMap());
+      mapJoinDesc.resetOrder();
     }
 
     @SuppressWarnings("unchecked")

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.Stack;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.ql.optimizer.metainfo.annotation;
 
 import java.util.ArrayList;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/TraitsUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/TraitsUtil.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/TraitsUtil.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/TraitsUtil.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.ql.optimizer.optiq;
 
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/cost/HiveVolcanoPlanner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/cost/HiveVolcanoPlanner.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/cost/HiveVolcanoPlanner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/cost/HiveVolcanoPlanner.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.ql.optimizer.optiq.cost;
 
 import org.eigenbase.rel.RelCollationTraitDef;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HivePushFilterPastJoinRule.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HivePushFilterPastJoinRule.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HivePushFilterPastJoinRule.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HivePushFilterPastJoinRule.java Tue Nov 18 00:48:40 2014
@@ -38,114 +38,121 @@ import org.eigenbase.sql.SqlKind;
 
 public abstract class HivePushFilterPastJoinRule extends PushFilterPastJoinRule {
 
-	public static final HivePushFilterPastJoinRule FILTER_ON_JOIN = new HivePushFilterIntoJoinRule();
+  public static final HivePushFilterPastJoinRule FILTER_ON_JOIN = new HivePushFilterIntoJoinRule();
 
-	public static final HivePushFilterPastJoinRule JOIN = new HivePushDownJoinConditionRule();
+  public static final HivePushFilterPastJoinRule JOIN           = new HivePushDownJoinConditionRule();
 
-	/**
-	 * Creates a PushFilterPastJoinRule with an explicit root operand.
-	 */
-	protected HivePushFilterPastJoinRule(RelOptRuleOperand operand, String id,
-			boolean smart, RelFactories.FilterFactory filterFactory,
-			RelFactories.ProjectFactory projectFactory) {
-		super(operand, id, smart, filterFactory, projectFactory);
-	}
-
-	/**
-	 * Rule that tries to push filter expressions into a join condition and into
-	 * the inputs of the join.
-	 */
-	public static class HivePushFilterIntoJoinRule extends
-			HivePushFilterPastJoinRule {
-		public HivePushFilterIntoJoinRule() {
-			super(RelOptRule.operand(FilterRelBase.class,
-					RelOptRule.operand(JoinRelBase.class, RelOptRule.any())),
-					"HivePushFilterPastJoinRule:filter", true,
-					HiveFilterRel.DEFAULT_FILTER_FACTORY,
-					HiveProjectRel.DEFAULT_PROJECT_FACTORY);
-		}
-
-		@Override
-		public void onMatch(RelOptRuleCall call) {
-			FilterRelBase filter = call.rel(0);
-			JoinRelBase join = call.rel(1);
-			super.perform(call, filter, join);
-		}
-	}
-
-	public static class HivePushDownJoinConditionRule extends
-			HivePushFilterPastJoinRule {
-		public HivePushDownJoinConditionRule() {
-			super(RelOptRule.operand(JoinRelBase.class, RelOptRule.any()),
-					"HivePushFilterPastJoinRule:no-filter", true,
-					HiveFilterRel.DEFAULT_FILTER_FACTORY,
-					HiveProjectRel.DEFAULT_PROJECT_FACTORY);
-		}
-
-		@Override
-		public void onMatch(RelOptRuleCall call) {
-			JoinRelBase join = call.rel(0);
-			super.perform(call, null, join);
-		}
-	}
-
-	/*
-	 * Any predicates pushed down to joinFilters that aren't equality
-	 * conditions: put them back as aboveFilters because Hive doesn't support
-	 * not equi join conditions.
-	 */
-	@Override
-	protected void validateJoinFilters(List<RexNode> aboveFilters,
-			List<RexNode> joinFilters, JoinRelBase join, JoinRelType joinType) {
-		if (joinType.equals(JoinRelType.INNER)) {
-			ListIterator<RexNode> filterIter = joinFilters.listIterator();
-			while (filterIter.hasNext()) {
-				RexNode exp = filterIter.next();
-				if (exp instanceof RexCall) {
-					RexCall c = (RexCall) exp;
-					if (c.getOperator().getKind() == SqlKind.EQUALS) {
-						boolean validHiveJoinFilter = true;
-						for (RexNode rn : c.getOperands()) {
-							// NOTE: Hive dis-allows projections from both left
-							// &
-							// right side
-							// of join condition. Example: Hive disallows
-							// (r1.x=r2.x)=(r1.y=r2.y) on join condition.
-							if (filterRefersToBothSidesOfJoin(rn, join)) {
-								validHiveJoinFilter = false;
-								break;
-							}
-						}
-						if (validHiveJoinFilter)
-							continue;
-					}
-				}
-				aboveFilters.add(exp);
-				filterIter.remove();
-			}
-		}
-	}
-
-	private boolean filterRefersToBothSidesOfJoin(RexNode filter, JoinRelBase j) {
-		boolean refersToBothSides = false;
-
-		int joinNoOfProjects = j.getRowType().getFieldCount();
-		BitSet filterProjs = new BitSet(joinNoOfProjects);
-		BitSet allLeftProjs = new BitSet(joinNoOfProjects);
-		BitSet allRightProjs = new BitSet(joinNoOfProjects);
-		allLeftProjs.set(0, j.getInput(0).getRowType().getFieldCount(), true);
-		allRightProjs.set(j.getInput(0).getRowType().getFieldCount(),
-				joinNoOfProjects, true);
-
-		InputFinder inputFinder = new InputFinder(filterProjs);
-		filter.accept(inputFinder);
-
-		if (allLeftProjs.intersects(filterProjs)
-				&& allRightProjs.intersects(filterProjs))
-			refersToBothSides = true;
+  /**
+   * Creates a PushFilterPastJoinRule with an explicit root operand.
+   */
+  protected HivePushFilterPastJoinRule(RelOptRuleOperand operand, String id, boolean smart,
+      RelFactories.FilterFactory filterFactory, RelFactories.ProjectFactory projectFactory) {
+    super(operand, id, smart, filterFactory, projectFactory);
+  }
+
+  /**
+   * Rule that tries to push filter expressions into a join condition and into
+   * the inputs of the join.
+   */
+  public static class HivePushFilterIntoJoinRule extends HivePushFilterPastJoinRule {
+    public HivePushFilterIntoJoinRule() {
+      super(RelOptRule.operand(FilterRelBase.class,
+          RelOptRule.operand(JoinRelBase.class, RelOptRule.any())),
+          "HivePushFilterPastJoinRule:filter", true, HiveFilterRel.DEFAULT_FILTER_FACTORY,
+          HiveProjectRel.DEFAULT_PROJECT_FACTORY);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      FilterRelBase filter = call.rel(0);
+      JoinRelBase join = call.rel(1);
+      super.perform(call, filter, join);
+    }
+  }
+
+  public static class HivePushDownJoinConditionRule extends HivePushFilterPastJoinRule {
+    public HivePushDownJoinConditionRule() {
+      super(RelOptRule.operand(JoinRelBase.class, RelOptRule.any()),
+          "HivePushFilterPastJoinRule:no-filter", true, HiveFilterRel.DEFAULT_FILTER_FACTORY,
+          HiveProjectRel.DEFAULT_PROJECT_FACTORY);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      JoinRelBase join = call.rel(0);
+      super.perform(call, null, join);
+    }
+  }
+
+  /*
+   * Any predicates pushed down to joinFilters that aren't equality conditions:
+   * put them back as aboveFilters because Hive doesn't support not equi join
+   * conditions.
+   */
+  @Override
+  protected void validateJoinFilters(List<RexNode> aboveFilters, List<RexNode> joinFilters,
+      JoinRelBase join, JoinRelType joinType) {
+    if (joinType.equals(JoinRelType.INNER)) {
+      ListIterator<RexNode> filterIter = joinFilters.listIterator();
+      while (filterIter.hasNext()) {
+        RexNode exp = filterIter.next();
+
+        if (exp instanceof RexCall) {
+          RexCall c = (RexCall) exp;
+          boolean validHiveJoinFilter = false;
+
+          if ((c.getOperator().getKind() == SqlKind.EQUALS)) {
+            validHiveJoinFilter = true;
+            for (RexNode rn : c.getOperands()) {
+              // NOTE: Hive dis-allows projections from both left & right side
+              // of join condition. Example: Hive disallows
+              // (r1.x +r2.x)=(r1.y+r2.y) on join condition.
+              if (filterRefersToBothSidesOfJoin(rn, join)) {
+                validHiveJoinFilter = false;
+                break;
+              }
+            }
+          } else if ((c.getOperator().getKind() == SqlKind.LESS_THAN)
+              || (c.getOperator().getKind() == SqlKind.GREATER_THAN)
+              || (c.getOperator().getKind() == SqlKind.LESS_THAN_OR_EQUAL)
+              || (c.getOperator().getKind() == SqlKind.GREATER_THAN_OR_EQUAL)) {
+            validHiveJoinFilter = true;
+            // NOTE: Hive dis-allows projections from both left & right side of
+            // join in in equality condition. Example: Hive disallows (r1.x <
+            // r2.x) on join condition.
+            if (filterRefersToBothSidesOfJoin(c, join)) {
+              validHiveJoinFilter = false;
+            }
+          }
+
+          if (validHiveJoinFilter)
+            continue;
+        }
+
+        aboveFilters.add(exp);
+        filterIter.remove();
+      }
+    }
+  }
+
+  private boolean filterRefersToBothSidesOfJoin(RexNode filter, JoinRelBase j) {
+    boolean refersToBothSides = false;
+
+    int joinNoOfProjects = j.getRowType().getFieldCount();
+    BitSet filterProjs = new BitSet(joinNoOfProjects);
+    BitSet allLeftProjs = new BitSet(joinNoOfProjects);
+    BitSet allRightProjs = new BitSet(joinNoOfProjects);
+    allLeftProjs.set(0, j.getInput(0).getRowType().getFieldCount(), true);
+    allRightProjs.set(j.getInput(0).getRowType().getFieldCount(), joinNoOfProjects, true);
 
-		return refersToBothSides;
-	}
+    InputFinder inputFinder = new InputFinder(filterProjs);
+    filter.accept(inputFinder);
+
+    if (allLeftProjs.intersects(filterProjs) && allRightProjs.intersects(filterProjs))
+      refersToBothSides = true;
+
+    return refersToBothSides;
+  }
 }
 
 // End PushFilterPastJoinRule.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java Tue Nov 18 00:48:40 2014
@@ -18,6 +18,9 @@
 package org.apache.hadoop.hive.ql.optimizer.optiq.stats;
 
 import java.util.BitSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.hive.ql.optimizer.optiq.RelOptHiveTable;
 import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveTableScanRel;
@@ -32,6 +35,10 @@ import org.eigenbase.rex.RexInputRef;
 import org.eigenbase.rex.RexNode;
 import org.eigenbase.rex.RexVisitorImpl;
 import org.eigenbase.sql.SqlKind;
+import org.eigenbase.sql.SqlOperator;
+import org.eigenbase.sql.type.SqlTypeUtil;
+
+import com.google.common.collect.Sets;
 
 public class FilterSelectivityEstimator extends RexVisitorImpl<Double> {
   private final RelNode childRel;
@@ -61,7 +68,7 @@ public class FilterSelectivityEstimator 
     }
 
     Double selectivity = null;
-    SqlKind op = call.getKind();
+    SqlKind op = getOp(call);
 
     switch (op) {
     case AND: {
@@ -74,6 +81,7 @@ public class FilterSelectivityEstimator 
       break;
     }
 
+    case NOT:
     case NOT_EQUALS: {
       selectivity = computeNotEqualitySelectivity(call);
       break;
@@ -88,7 +96,16 @@ public class FilterSelectivityEstimator 
     }
 
     case IN: {
-      selectivity = ((double) 1 / ((double) call.operands.size()));
+      // TODO: 1) check for duplicates 2) We assume in clause values to be
+      // present in NDV which may not be correct (Range check can find it) 3) We
+      // assume values in NDV set is uniformly distributed over col values
+      // (account for skewness - histogram).
+      selectivity = computeFunctionSelectivity(call) * (call.operands.size() - 1);
+      if (selectivity <= 0.0) {
+        selectivity = 0.10;
+      } else if (selectivity >= 1.0) {
+        selectivity = 1.0;
+      }
       break;
     }
 
@@ -152,18 +169,19 @@ public class FilterSelectivityEstimator 
       }
       tmpCardinality = childCardinality * tmpSelectivity;
 
-      if (tmpCardinality > 1)
+      if (tmpCardinality > 1 && tmpCardinality < childCardinality) {
         tmpSelectivity = (1 - tmpCardinality / childCardinality);
-      else
+      } else {
         tmpSelectivity = 1.0;
+      }
 
       selectivity *= tmpSelectivity;
     }
 
-    if (selectivity > 1)
-      return (1 - selectivity);
-    else
-      return 1.0;
+    if (selectivity < 0.0)
+      selectivity = 0.0;
+
+    return (1 - selectivity);
   }
 
   /**
@@ -225,4 +243,19 @@ public class FilterSelectivityEstimator 
     }
     return false;
   }
+
+  private SqlKind getOp(RexCall call) {
+    SqlKind op = call.getKind();
+
+    if (call.getKind().equals(SqlKind.OTHER_FUNCTION)
+        && SqlTypeUtil.inBooleanFamily(call.getType())) {
+      SqlOperator sqlOp = call.getOperator();
+      String opName = (sqlOp != null) ? sqlOp.getName() : "";
+      if (opName.equalsIgnoreCase("in")) {
+        op = SqlKind.IN;
+      }
+    }
+
+    return op;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdUniqueKeys.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdUniqueKeys.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdUniqueKeys.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdUniqueKeys.java Tue Nov 18 00:48:40 2014
@@ -38,6 +38,7 @@ import org.eigenbase.rel.metadata.Metada
 import org.eigenbase.rel.metadata.ReflectiveRelMetadataProvider;
 import org.eigenbase.rel.metadata.RelMdUniqueKeys;
 import org.eigenbase.rel.metadata.RelMetadataProvider;
+import org.eigenbase.relopt.RelOptUtil;
 import org.eigenbase.relopt.hep.HepRelVertex;
 import org.eigenbase.rex.RexInputRef;
 import org.eigenbase.rex.RexNode;
@@ -100,7 +101,7 @@ public class HiveRelMdUniqueKeys {
           cStat.getRange().minValue != null) {
         double r = cStat.getRange().maxValue.doubleValue() - 
             cStat.getRange().minValue.doubleValue() + 1;
-        isKey = (numRows == r);
+        isKey = (Math.abs(numRows - r) < RelOptUtil.EPSILON);
       }
       if ( isKey ) {
         BitSet key = new BitSet();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java Tue Nov 18 00:48:40 2014
@@ -278,6 +278,7 @@ public class SqlFunctionConverter {
       registerFunction(">=", SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
           hToken(HiveParser.GREATERTHANOREQUALTO, ">="));
       registerFunction("!", SqlStdOperatorTable.NOT, hToken(HiveParser.KW_NOT, "not"));
+      registerFunction("<>", SqlStdOperatorTable.NOT_EQUALS, hToken(HiveParser.NOTEQUAL, "<>"));
     }
 
     private void registerFunction(String name, SqlOperator optiqFn, HiveToken hiveToken) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java Tue Nov 18 00:48:40 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 public class PartExprEvalUtils {
@@ -103,11 +104,13 @@ public class PartExprEvalUtils {
   }
 
   static synchronized public ObjectPair<PrimitiveObjectInspector, ExprNodeEvaluator> prepareExpr(
-      ExprNodeGenericFuncDesc expr, List<String> partNames) throws HiveException {
+      ExprNodeGenericFuncDesc expr, List<String> partNames,
+      List<PrimitiveTypeInfo> partColumnTypeInfos) throws HiveException {
     // Create the row object
     List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>();
     for (int i = 0; i < partNames.size(); i++) {
-      partObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+      partObjectInspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
+          partColumnTypeInfos.get(i)));
     }
     StructObjectInspector objectInspector = ObjectInspectorFactory
         .getStandardStructObjectInspector(partNames, partObjectInspectors);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java Tue Nov 18 00:48:40 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 
 /**
  * The basic implementation of PartitionExpressionProxy that uses ql package classes.
@@ -40,13 +41,14 @@ public class PartitionExpressionForMetas
   }
 
   @Override
-  public boolean filterPartitionsByExpr(List<String> columnNames, byte[] exprBytes,
+  public boolean filterPartitionsByExpr(List<String> partColumnNames,
+      List<PrimitiveTypeInfo> partColumnTypeInfos, byte[] exprBytes,
       String defaultPartitionName, List<String> partitionNames) throws MetaException {
     ExprNodeGenericFuncDesc expr = deserializeExpr(exprBytes);
     try {
       long startTime = System.nanoTime(), len = partitionNames.size();
       boolean result = PartitionPruner.prunePartitionNames(
-          columnNames, expr, defaultPartitionName, partitionNames);
+          partColumnNames, partColumnTypeInfos, expr, defaultPartitionName, partitionNames);
       double timeMs = (System.nanoTime() - startTime) / 1000000.0;
       LOG.debug("Pruning " + len + " partition names took " + timeMs + "ms");
       return result;