You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/18 01:48:46 UTC
svn commit: r1640263 [6/12] - in /hive/branches/spark: ./
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/ accu...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Nov 18 00:48:40 2014
@@ -1135,7 +1135,9 @@ public class FileSinkOperator extends Te
String postfix=null;
if (taskIndependent) {
// key = "database.table/SP/DP/"LB/
- prefix = conf.getTableInfo().getTableName();
+ // Hive store lowercase table name in metastore, and Counters is character case sensitive, so we
+ // use lowercase table name as prefix here, as StatsTask get table name from metastore to fetch counter.
+ prefix = conf.getTableInfo().getTableName().toLowerCase();
} else {
// key = "prefix/SP/DP/"LB/taskID/
prefix = conf.getStatsAggPrefix();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java Tue Nov 18 00:48:40 2014
@@ -33,6 +33,6 @@ public interface HashTableLoader {
void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp);
- void load(MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes)
- throws HiveException;
+ void load(MapJoinTableContainer[] mapJoinTables,
+ MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Tue Nov 18 00:48:40 2014
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.HashTableLoaderFactory;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
@@ -187,7 +188,9 @@ public class MapJoinOperator extends Abs
}
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
loader.init(getExecContext(), hconf, this);
- loader.load(mapJoinTables, mapJoinTableSerdes);
+ long memUsage = (long)(MapJoinMemoryExhaustionHandler.getMaxHeapSize()
+ * conf.getHashTableMemoryUsage());
+ loader.load(mapJoinTables, mapJoinTableSerdes, memUsage);
if (!conf.isBucketMapJoin()) {
/*
* The issue with caching in case of bucket map join is that different tasks
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java Tue Nov 18 00:48:40 2014
@@ -55,22 +55,30 @@ public class MapJoinMemoryExhaustionHand
this.console = console;
this.maxMemoryUsage = maxMemoryUsage;
this.memoryMXBean = ManagementFactory.getMemoryMXBean();
- long maxHeapSize = memoryMXBean.getHeapMemoryUsage().getMax();
+ this.maxHeapSize = getMaxHeapSize(memoryMXBean);
+ percentageNumberFormat = NumberFormat.getInstance();
+ percentageNumberFormat.setMinimumFractionDigits(2);
+ LOG.info("JVM Max Heap Size: " + this.maxHeapSize);
+ }
+
+ public static long getMaxHeapSize() {
+ return getMaxHeapSize(ManagementFactory.getMemoryMXBean());
+ }
+
+ private static long getMaxHeapSize(MemoryMXBean bean) {
+ long maxHeapSize = bean.getHeapMemoryUsage().getMax();
/*
* According to the javadoc, getMax() can return -1. In this case
* default to 200MB. This will probably never actually happen.
*/
if(maxHeapSize == -1) {
- this.maxHeapSize = 200L * 1024L * 1024L;
LOG.warn("MemoryMXBean.getHeapMemoryUsage().getMax() returned -1, " +
"defaulting maxHeapSize to 200MB");
- } else {
- this.maxHeapSize = maxHeapSize;
+ return 200L * 1024L * 1024L;
}
- percentageNumberFormat = NumberFormat.getInstance();
- percentageNumberFormat.setMinimumFractionDigits(2);
- LOG.info("JVM Max Heap Size: " + this.maxHeapSize);
+ return maxHeapSize;
}
+
/**
* Throws MapJoinMemoryExhaustionException when the JVM has consumed the
* configured percentage of memory. The arguments are used simply for the error
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java Tue Nov 18 00:48:40 2014
@@ -72,7 +72,7 @@ public class HashTableLoader implements
@Override
public void load(
MapJoinTableContainer[] mapJoinTables,
- MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+ MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException {
String currentInputPath = context.getCurrentInputPath().toString();
LOG.info("******* Load from HashTable for input file: " + currentInputPath);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java Tue Nov 18 00:48:40 2014
@@ -149,13 +149,27 @@ public final class BytesBytesMultiHashMa
/** We have 39 bits to store list pointer from the first record; this is size limit */
final static long MAX_WB_SIZE = ((long)1) << 38;
+ /** 8 Gb of refs is the max capacity if memory limit is not specified. If someone has 100s of
+ * Gbs of memory (this might happen pretty soon) we'd need to string together arrays anyway. */
+ private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024;
- public BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) {
+ public BytesBytesMultiHashMap(int initialCapacity,
+ float loadFactor, int wbSize, long memUsage, int defaultCapacity) {
if (loadFactor < 0 || loadFactor > 1) {
throw new AssertionError("Load factor must be between (0, 1].");
}
+ assert initialCapacity > 0;
initialCapacity = (Long.bitCount(initialCapacity) == 1)
? initialCapacity : nextHighestPowerOfTwo(initialCapacity);
+ // 8 bytes per long in the refs, assume data will be empty. This is just a sanity check.
+ int maxCapacity = (memUsage <= 0) ? DEFAULT_MAX_CAPACITY
+ : (int)Math.min((long)DEFAULT_MAX_CAPACITY, memUsage / 8);
+ if (maxCapacity < initialCapacity || initialCapacity <= 0) {
+ // Either initialCapacity is too large, or nextHighestPowerOfTwo overflows
+ initialCapacity = (Long.bitCount(maxCapacity) == 1)
+ ? maxCapacity : nextLowestPowerOfTwo(maxCapacity);
+ }
+
validateCapacity(initialCapacity);
startingHashBitCount = 63 - Long.numberOfLeadingZeros(initialCapacity);
this.loadFactor = loadFactor;
@@ -164,6 +178,11 @@ public final class BytesBytesMultiHashMa
resizeThreshold = (int)(initialCapacity * this.loadFactor);
}
+ @VisibleForTesting
+ BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) {
+ this(initialCapacity, loadFactor, wbSize, -1, 100000);
+ }
+
/** The source of keys and values to put into hashtable; avoids byte copying. */
public static interface KvSource {
/** Write key into output. */
@@ -644,6 +663,10 @@ public final class BytesBytesMultiHashMa
return Integer.highestOneBit(v) << 1;
}
+ private static int nextLowestPowerOfTwo(int v) {
+ return Integer.highestOneBit(v);
+ }
+
@VisibleForTesting
int getCapacity() {
return refs.length;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.exec.persistence;
@@ -60,17 +78,20 @@ public class MapJoinBytesTableContainer
private final List<Object> EMPTY_LIST = new ArrayList<Object>(0);
public MapJoinBytesTableContainer(Configuration hconf,
- MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException {
+ MapJoinObjectSerDeContext valCtx, long keyCount, long memUsage) throws SerDeException {
this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT),
HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD),
HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR),
- HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), valCtx, keyCount);
+ HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE),
+ valCtx, keyCount, memUsage);
}
private MapJoinBytesTableContainer(float keyCountAdj, int threshold, float loadFactor,
- int wbSize, MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException {
- threshold = HashMapWrapper.calculateTableSize(keyCountAdj, threshold, loadFactor, keyCount);
- hashMap = new BytesBytesMultiHashMap(threshold, loadFactor, wbSize);
+ int wbSize, MapJoinObjectSerDeContext valCtx, long keyCount, long memUsage)
+ throws SerDeException {
+ int newThreshold = HashMapWrapper.calculateTableSize(
+ keyCountAdj, threshold, loadFactor, keyCount);
+ hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage, threshold);
}
private LazyBinaryStructObjectInspector createInternalOi(
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Tue Nov 18 00:48:40 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
@@ -69,7 +70,7 @@ public class HashTableLoader implements
@Override
public void load(
MapJoinTableContainer[] mapJoinTables,
- MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+ MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException {
TezContext tezContext = (TezContext) MapredContext.get();
Map<Integer, String> parentToInput = desc.getParentToInput();
@@ -106,7 +107,7 @@ public class HashTableLoader implements
Long keyCountObj = parentKeyCounts.get(pos);
long keyCount = (keyCountObj == null) ? -1 : keyCountObj.longValue();
MapJoinTableContainer tableContainer = useOptimizedTables
- ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount)
+ ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage)
: new HashMapWrapper(hconf, keyCount);
while (kvReader.next()) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Tue Nov 18 00:48:40 2014
@@ -45,6 +45,8 @@ import org.apache.tez.dag.api.client.Sta
import org.apache.tez.dag.api.client.VertexStatus;
import org.fusesource.jansi.Ansi;
+import com.google.common.base.Preconditions;
+
import java.io.IOException;
import java.io.PrintStream;
import java.text.DecimalFormat;
@@ -132,6 +134,11 @@ public class TezJobMonitor {
});
}
+ public static void initShutdownHook() {
+ Preconditions.checkNotNull(shutdownList,
+ "Shutdown hook was not properly initialized");
+ }
+
public TezJobMonitor() {
console = SessionState.getConsole();
secondsFormat = new DecimalFormat("#0.00");
@@ -290,6 +297,7 @@ public class TezJobMonitor {
break;
case INITING:
console.printInfo("Status: Initializing");
+ startTime = System.currentTimeMillis();
break;
case RUNNING:
if (!running) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Tue Nov 18 00:48:40 2014
@@ -187,6 +187,7 @@ public class TezSessionState {
LOG.info("Opening new Tez Session (id: " + sessionId
+ ", scratch dir: " + tezScratchDir + ")");
+ TezJobMonitor.initShutdownHook();
session.start();
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Tue Nov 18 00:48:40 2014
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.Context;
@@ -170,7 +171,8 @@ public class TezTask extends Task<TezWor
counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
TezSessionPoolManager.getInstance().returnSession(session);
- if (LOG.isInfoEnabled() && counters != null) {
+ if (LOG.isInfoEnabled() && counters != null
+ && conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY)) {
for (CounterGroup group: counters) {
LOG.info(group.getDisplayName() +":");
for (TezCounter counter: group) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java Tue Nov 18 00:48:40 2014
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -89,6 +90,8 @@ public class ATSHook implements ExecuteW
@Override
public void run(final HookContext hookContext) throws Exception {
final long currentTime = System.currentTimeMillis();
+ final HiveConf conf = new HiveConf(hookContext.getConf());
+
executor.submit(new Runnable() {
@Override
public void run() {
@@ -110,19 +113,19 @@ public class ATSHook implements ExecuteW
switch(hookContext.getHookType()) {
case PRE_EXEC_HOOK:
ExplainTask explain = new ExplainTask();
- explain.initialize(hookContext.getConf(), plan, null);
+ explain.initialize(conf, plan, null);
String query = plan.getQueryStr();
List<Task<?>> rootTasks = plan.getRootTasks();
JSONObject explainPlan = explain.getJSONPlan(null, null, rootTasks,
plan.getFetchTask(), true, false, false);
- fireAndForget(hookContext.getConf(), createPreHookEvent(queryId, query,
+ fireAndForget(conf, createPreHookEvent(queryId, query,
explainPlan, queryStartTime, user, numMrJobs, numTezJobs));
break;
case POST_EXEC_HOOK:
- fireAndForget(hookContext.getConf(), createPostHookEvent(queryId, currentTime, user, true));
+ fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, true));
break;
case ON_FAILURE_HOOK:
- fireAndForget(hookContext.getConf(), createPostHookEvent(queryId, currentTime, user, false));
+ fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, false));
break;
default:
//ignore
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java Tue Nov 18 00:48:40 2014
@@ -394,7 +394,8 @@ class ColumnStatisticsImpl implements Co
} else if (str.minimum != null) {
if (minimum.compareTo(str.minimum) > 0) {
minimum = new Text(str.getMinimum());
- } else if (maximum.compareTo(str.maximum) < 0) {
+ }
+ if (maximum.compareTo(str.maximum) < 0) {
maximum = new Text(str.getMaximum());
}
}
@@ -563,7 +564,8 @@ class ColumnStatisticsImpl implements Co
} else if (dec.minimum != null) {
if (minimum.compareTo(dec.minimum) > 0) {
minimum = dec.minimum;
- } else if (maximum.compareTo(dec.maximum) < 0) {
+ }
+ if (maximum.compareTo(dec.maximum) < 0) {
maximum = dec.maximum;
}
if (sum == null || dec.sum == null) {
@@ -671,7 +673,8 @@ class ColumnStatisticsImpl implements Co
} else if (dateStats.minimum != null) {
if (minimum > dateStats.minimum) {
minimum = dateStats.minimum;
- } else if (maximum < dateStats.maximum) {
+ }
+ if (maximum < dateStats.maximum) {
maximum = dateStats.maximum;
}
}
@@ -767,7 +770,8 @@ class ColumnStatisticsImpl implements Co
} else if (timestampStats.minimum != null) {
if (minimum > timestampStats.minimum) {
minimum = timestampStats.minimum;
- } else if (maximum < timestampStats.maximum) {
+ }
+ if (maximum < timestampStats.maximum) {
maximum = timestampStats.maximum;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java Tue Nov 18 00:48:40 2014
@@ -65,6 +65,8 @@ public final class FileDump {
System.out.println("Structure for " + filename);
Path path = new Path(filename);
Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ System.out.println("File Version: " + reader.getFileVersion().getName() +
+ " with " + reader.getWriterVersion());
RecordReaderImpl rows = (RecordReaderImpl) reader.rows();
System.out.println("Rows: " + reader.getNumberOfRows());
System.out.println("Compression: " + reader.getCompression());
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Tue Nov 18 00:48:40 2014
@@ -97,6 +97,26 @@ public final class OrcFile {
}
}
+ /**
+ * Records the version of the writer in terms of which bugs have been fixed.
+ * For bugs in the writer, but the old readers already read the new data
+ * correctly, bump this version instead of the Version.
+ */
+ public static enum WriterVersion {
+ ORIGINAL(0),
+ HIVE_8732(1); // corrupted stripe/file maximum column statistics
+
+ private final int id;
+
+ public int getId() {
+ return id;
+ }
+
+ private WriterVersion(int id) {
+ this.id = id;
+ }
+ }
+
public static enum EncodingStrategy {
SPEED, COMPRESSION;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Tue Nov 18 00:48:40 2014
@@ -630,6 +630,7 @@ public class OrcInputFormat implements
private final boolean isOriginal;
private final List<Long> deltas;
private final boolean hasBase;
+ private OrcFile.WriterVersion writerVersion;
SplitGenerator(Context context, FileSystem fs,
FileStatus file, FileInfo fileInfo,
@@ -775,7 +776,9 @@ public class OrcInputFormat implements
Reader.Options options = new Reader.Options();
setIncludedColumns(options, types, context.conf, isOriginal);
setSearchArgument(options, types, context.conf, isOriginal);
- if (options.getSearchArgument() != null) {
+ // only do split pruning if HIVE-8732 has been fixed in the writer
+ if (options.getSearchArgument() != null &&
+ writerVersion != OrcFile.WriterVersion.ORIGINAL) {
SearchArgument sarg = options.getSearchArgument();
List<PredicateLeaf> sargLeaves = sarg.getLeaves();
List<StripeStatistics> stripeStats = metadata.getStripeStatistics();
@@ -866,6 +869,7 @@ public class OrcInputFormat implements
fileMetaInfo = fileInfo.fileMetaInfo;
metadata = fileInfo.metadata;
types = fileInfo.types;
+ writerVersion = fileInfo.writerVersion;
// For multiple runs, in case sendSplitsInFooter changes
if (fileMetaInfo == null && context.footerInSplits) {
orcReader = OrcFile.createReader(file.getPath(),
@@ -873,6 +877,7 @@ public class OrcInputFormat implements
fileInfo.fileMetaInfo = ((ReaderImpl) orcReader).getFileMetaInfo();
fileInfo.metadata = orcReader.getMetadata();
fileInfo.types = orcReader.getTypes();
+ fileInfo.writerVersion = orcReader.getWriterVersion();
}
} else {
orcReader = OrcFile.createReader(file.getPath(),
@@ -880,13 +885,14 @@ public class OrcInputFormat implements
stripes = orcReader.getStripes();
metadata = orcReader.getMetadata();
types = orcReader.getTypes();
+ writerVersion = orcReader.getWriterVersion();
fileMetaInfo = context.footerInSplits ?
((ReaderImpl) orcReader).getFileMetaInfo() : null;
if (context.cacheStripeDetails) {
// Populate into cache.
Context.footerCache.put(file.getPath(),
new FileInfo(file.getModificationTime(), file.getLen(), stripes,
- metadata, types, fileMetaInfo));
+ metadata, types, fileMetaInfo, writerVersion));
}
}
} catch (Throwable th) {
@@ -981,18 +987,21 @@ public class OrcInputFormat implements
ReaderImpl.FileMetaInfo fileMetaInfo;
Metadata metadata;
List<OrcProto.Type> types;
+ private OrcFile.WriterVersion writerVersion;
FileInfo(long modificationTime, long size,
List<StripeInformation> stripeInfos,
Metadata metadata, List<OrcProto.Type> types,
- ReaderImpl.FileMetaInfo fileMetaInfo) {
+ ReaderImpl.FileMetaInfo fileMetaInfo,
+ OrcFile.WriterVersion writerVersion) {
this.modificationTime = modificationTime;
this.size = size;
this.stripeInfos = stripeInfos;
this.fileMetaInfo = fileMetaInfo;
this.metadata = metadata;
this.types = types;
+ this.writerVersion = writerVersion;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java Tue Nov 18 00:48:40 2014
@@ -38,6 +38,7 @@ public class OrcNewSplit extends FileSpl
private boolean isOriginal;
private boolean hasBase;
private final List<Long> deltas = new ArrayList<Long>();
+ private OrcFile.WriterVersion writerVersion;
protected OrcNewSplit(){
//The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it.
@@ -83,6 +84,7 @@ public class OrcNewSplit extends FileSpl
WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
out.write(footerBuff.array(), footerBuff.position(),
footerBuff.limit() - footerBuff.position());
+ WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId());
}
}
@@ -111,9 +113,11 @@ public class OrcNewSplit extends FileSpl
int footerBuffSize = WritableUtils.readVInt(in);
ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
in.readFully(footerBuff.array(), 0, footerBuffSize);
+ OrcFile.WriterVersion writerVersion =
+ ReaderImpl.getWriterVersion(WritableUtils.readVInt(in));
fileMetaInfo = new ReaderImpl.FileMetaInfo(compressionType, bufferSize,
- metadataSize, footerBuff);
+ metadataSize, footerBuff, writerVersion);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java Tue Nov 18 00:48:40 2014
@@ -42,6 +42,7 @@ public class OrcSplit extends FileSplit
private boolean isOriginal;
private boolean hasBase;
private final List<Long> deltas = new ArrayList<Long>();
+ private OrcFile.WriterVersion writerVersion;
static final int BASE_FLAG = 4;
static final int ORIGINAL_FLAG = 2;
@@ -92,6 +93,7 @@ public class OrcSplit extends FileSplit
WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
out.write(footerBuff.array(), footerBuff.position(),
footerBuff.limit() - footerBuff.position());
+ WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId());
}
}
@@ -120,9 +122,11 @@ public class OrcSplit extends FileSplit
int footerBuffSize = WritableUtils.readVInt(in);
ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
in.readFully(footerBuff.array(), 0, footerBuffSize);
+ OrcFile.WriterVersion writerVersion =
+ ReaderImpl.getWriterVersion(WritableUtils.readVInt(in));
fileMetaInfo = new ReaderImpl.FileMetaInfo(compressionType, bufferSize,
- metadataSize, footerBuff);
+ metadataSize, footerBuff, writerVersion);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Tue Nov 18 00:48:40 2014
@@ -129,6 +129,16 @@ public interface Reader {
List<OrcProto.Type> getTypes();
/**
+ * Get the file format version.
+ */
+ OrcFile.Version getFileVersion();
+
+ /**
+ * Get the version of the writer of this file.
+ */
+ OrcFile.WriterVersion getWriterVersion();
+
+ /**
* Options for creating a RecordReader.
*/
public static class Options {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Tue Nov 18 00:48:40 2014
@@ -62,6 +62,7 @@ final class ReaderImpl implements Reader
private long deserializedSize = -1;
private final Configuration conf;
private final List<Integer> versionList;
+ private final OrcFile.WriterVersion writerVersion;
//serialized footer - Keeping this around for use by getFileMetaInfo()
// will help avoid cpu cycles spend in deserializing at cost of increased
@@ -182,6 +183,22 @@ final class ReaderImpl implements Reader
}
@Override
+ public OrcFile.Version getFileVersion() {
+ for (OrcFile.Version version: OrcFile.Version.values()) {
+ if (version.getMajor() == versionList.get(0) &&
+ version.getMinor() == versionList.get(1)) {
+ return version;
+ }
+ }
+ return OrcFile.Version.V_0_11;
+ }
+
+ @Override
+ public OrcFile.WriterVersion getWriterVersion() {
+ return writerVersion;
+ }
+
+ @Override
public int getRowIndexStride() {
return footer.getRowIndexStride();
}
@@ -309,8 +326,22 @@ final class ReaderImpl implements Reader
this.footer = rInfo.footer;
this.inspector = rInfo.inspector;
this.versionList = footerMetaData.versionList;
+ this.writerVersion = footerMetaData.writerVersion;
}
+ /**
+ * Get the WriterVersion based on the ORC file postscript.
+ * @param writerVersion the integer writer version
+ * @return
+ */
+ static OrcFile.WriterVersion getWriterVersion(int writerVersion) {
+ for(OrcFile.WriterVersion version: OrcFile.WriterVersion.values()) {
+ if (version.getId() == writerVersion) {
+ return version;
+ }
+ }
+ return OrcFile.WriterVersion.ORIGINAL;
+ }
private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs,
Path path,
@@ -346,6 +377,12 @@ final class ReaderImpl implements Reader
int footerSize = (int) ps.getFooterLength();
int metadataSize = (int) ps.getMetadataLength();
+ OrcFile.WriterVersion writerVersion;
+ if (ps.hasWriterVersion()) {
+ writerVersion = getWriterVersion(ps.getWriterVersion());
+ } else {
+ writerVersion = OrcFile.WriterVersion.ORIGINAL;
+ }
//check compression codec
switch (ps.getCompression()) {
@@ -391,7 +428,8 @@ final class ReaderImpl implements Reader
(int) ps.getCompressionBlockSize(),
(int) ps.getMetadataLength(),
buffer,
- ps.getVersionList()
+ ps.getVersionList(),
+ writerVersion
);
}
@@ -451,25 +489,29 @@ final class ReaderImpl implements Reader
final int metadataSize;
final ByteBuffer footerBuffer;
final List<Integer> versionList;
+ final OrcFile.WriterVersion writerVersion;
FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
- ByteBuffer footerBuffer) {
- this(compressionType, bufferSize, metadataSize, footerBuffer, null);
+ ByteBuffer footerBuffer, OrcFile.WriterVersion writerVersion) {
+ this(compressionType, bufferSize, metadataSize, footerBuffer, null,
+ writerVersion);
}
FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
- ByteBuffer footerBuffer, List<Integer> versionList){
+ ByteBuffer footerBuffer, List<Integer> versionList,
+ OrcFile.WriterVersion writerVersion){
this.compressionType = compressionType;
this.bufferSize = bufferSize;
this.metadataSize = metadataSize;
this.footerBuffer = footerBuffer;
this.versionList = versionList;
+ this.writerVersion = writerVersion;
}
}
public FileMetaInfo getFileMetaInfo(){
return new FileMetaInfo(compressionKind.toString(), bufferSize,
- metadataSize, footerByteBuffer, versionList);
+ metadataSize, footerByteBuffer, versionList, writerVersion);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Nov 18 00:48:40 2014
@@ -2364,20 +2364,21 @@ class RecordReaderImpl implements Record
PredicateLeaf predicate) {
ColumnStatistics cs = ColumnStatisticsImpl.deserialize(index);
Object minValue = getMin(cs);
+ Object maxValue = getMax(cs);
+ return evaluatePredicateRange(predicate, minValue, maxValue);
+ }
+
+ static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
+ Object max) {
// if we didn't have any values, everything must have been null
- if (minValue == null) {
+ if (min == null) {
if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
return TruthValue.YES;
} else {
return TruthValue.NULL;
}
}
- Object maxValue = getMax(cs);
- return evaluatePredicateRange(predicate, minValue, maxValue);
- }
- static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
- Object max) {
Location loc;
try {
// Predicate object and stats object can be one of the following base types
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TimestampColumnStatistics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TimestampColumnStatistics.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TimestampColumnStatistics.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TimestampColumnStatistics.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.io.orc;
import java.sql.Timestamp;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Tue Nov 18 00:48:40 2014
@@ -2230,7 +2230,8 @@ class WriterImpl implements Writer, Memo
.setMetadataLength(metadataLength)
.setMagic(OrcFile.MAGIC)
.addVersion(version.getMajor())
- .addVersion(version.getMinor());
+ .addVersion(version.getMinor())
+ .setWriterVersion(OrcFile.WriterVersion.HIVE_8732.getId());
if (compress != CompressionKind.NONE) {
builder.setCompressionBlockSize(bufferSize);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java Tue Nov 18 00:48:40 2014
@@ -63,6 +63,8 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.ParquetWriter;
import parquet.io.api.Binary;
/**
@@ -70,13 +72,18 @@ import parquet.io.api.Binary;
* A ParquetHiveSerDe for Hive (with the deprecated package mapred)
*
*/
-@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES})
+@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES,
+ ParquetOutputFormat.COMPRESSION})
public class ParquetHiveSerDe extends AbstractSerDe {
public static final Text MAP_KEY = new Text("key");
public static final Text MAP_VALUE = new Text("value");
public static final Text MAP = new Text("map");
public static final Text ARRAY = new Text("bag");
+ // default compression type for parquet output format
+ private static final String DEFAULTCOMPRESSION =
+ ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME.name();
+
// Map precision to the number bytes needed for binary conversion.
public static final int PRECISION_TO_BYTE_COUNT[] = new int[38];
static {
@@ -99,6 +106,7 @@ public class ParquetHiveSerDe extends Ab
private LAST_OPERATION status;
private long serializedSize;
private long deserializedSize;
+ private String compressionType;
@Override
public final void initialize(final Configuration conf, final Properties tbl) throws SerDeException {
@@ -110,6 +118,9 @@ public class ParquetHiveSerDe extends Ab
final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+ // Get compression properties
+ compressionType = tbl.getProperty(ParquetOutputFormat.COMPRESSION, DEFAULTCOMPRESSION);
+
if (columnNameProperty.length() == 0) {
columnNames = new ArrayList<String>();
} else {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java Tue Nov 18 00:48:40 2014
@@ -18,10 +18,12 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -55,21 +57,13 @@ public class ParquetRecordWriterWrapper
}
taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
+ LOG.info("initialize serde with table properties.");
+ initializeSerProperties(taskContext, tableProperties);
+
LOG.info("creating real writer to write at " + name);
- String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION);
- if (compressionName != null && !compressionName.isEmpty()) {
- //get override compression properties via "tblproperties" clause if it is set
- LOG.debug("get override compression properties via tblproperties");
-
- ContextUtil.getConfiguration(taskContext);
- CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName);
- realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(jobConf,
- new Path(name), codecName);
- } else {
- realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext,
- new Path(name));
- }
+ realWriter =
+ ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name));
LOG.info("real writer: " + realWriter);
} catch (final InterruptedException e) {
@@ -77,6 +71,31 @@ public class ParquetRecordWriterWrapper
}
}
+ private void initializeSerProperties(JobContext job, Properties tableProperties) {
+ String blockSize = tableProperties.getProperty(ParquetOutputFormat.BLOCK_SIZE);
+ Configuration conf = ContextUtil.getConfiguration(job);
+ if (blockSize != null && !blockSize.isEmpty()) {
+ LOG.debug("get override parquet.block.size property via tblproperties");
+ conf.setInt(ParquetOutputFormat.BLOCK_SIZE, Integer.valueOf(blockSize));
+ }
+
+ String enableDictionaryPage =
+ tableProperties.getProperty(ParquetOutputFormat.ENABLE_DICTIONARY);
+ if (enableDictionaryPage != null && !enableDictionaryPage.isEmpty()) {
+ LOG.debug("get override parquet.enable.dictionary property via tblproperties");
+ conf.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY,
+ Boolean.valueOf(enableDictionaryPage));
+ }
+
+ String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION);
+ if (compressionName != null && !compressionName.isEmpty()) {
+ //get override compression properties via "tblproperties" clause if it is set
+ LOG.debug("get override compression properties via tblproperties");
+ CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName);
+ conf.set(ParquetOutputFormat.COMPRESSION, codecName.name());
+ }
+ }
+
@Override
public void close(final Reporter reporter) throws IOException {
try {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.io.sarg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.lib;
import org.apache.hadoop.hive.ql.exec.Operator;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Tue Nov 18 00:48:40 2014
@@ -92,6 +92,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -748,8 +749,9 @@ public class Hive {
throw new HiveException("Table name " + indexTblName + " already exists. Choose another name.");
}
- org.apache.hadoop.hive.metastore.api.StorageDescriptor storageDescriptor = baseTbl.getSd().deepCopy();
- SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo();
+ SerDeInfo serdeInfo = new SerDeInfo();
+ serdeInfo.setName(indexTblName);
+
if(serde != null) {
serdeInfo.setSerializationLib(serde);
} else {
@@ -762,6 +764,7 @@ public class Hive {
}
}
+ serdeInfo.setParameters(new HashMap<String, String>());
if (fieldDelim != null) {
serdeInfo.getParameters().put(FIELD_DELIM, fieldDelim);
serdeInfo.getParameters().put(SERIALIZATION_FORMAT, fieldDelim);
@@ -788,18 +791,8 @@ public class Hive {
}
}
- storageDescriptor.setLocation(null);
- if (location != null) {
- storageDescriptor.setLocation(location);
- }
- storageDescriptor.setInputFormat(inputFormat);
- storageDescriptor.setOutputFormat(outputFormat);
-
- Map<String, String> params = new HashMap<String,String>();
-
List<FieldSchema> indexTblCols = new ArrayList<FieldSchema>();
List<Order> sortCols = new ArrayList<Order>();
- storageDescriptor.setBucketCols(null);
int k = 0;
Table metaBaseTbl = new Table(baseTbl);
for (int i = 0; i < metaBaseTbl.getCols().size(); i++) {
@@ -815,9 +808,6 @@ public class Hive {
"Check the index columns, they should appear in the table being indexed.");
}
- storageDescriptor.setCols(indexTblCols);
- storageDescriptor.setSortCols(sortCols);
-
int time = (int) (System.currentTimeMillis() / 1000);
org.apache.hadoop.hive.metastore.api.Table tt = null;
HiveIndexHandler indexHandler = HiveUtils.getIndexHandler(this.getConf(), indexHandlerClass);
@@ -851,8 +841,21 @@ public class Hive {
String tdname = Utilities.getDatabaseName(tableName);
String ttname = Utilities.getTableName(tableName);
+
+ StorageDescriptor indexSd = new StorageDescriptor(
+ indexTblCols,
+ location,
+ inputFormat,
+ outputFormat,
+ false/*compressed - not used*/,
+ -1/*numBuckets - default is -1 when the table has no buckets*/,
+ serdeInfo,
+ null/*bucketCols*/,
+ sortCols,
+ null/*parameters*/);
+
Index indexDesc = new Index(indexName, indexHandlerClass, tdname, ttname, time, time, indexTblName,
- storageDescriptor, params, deferredRebuild);
+ indexSd, new HashMap<String,String>(), deferredRebuild);
if (indexComment != null) {
indexDesc.getParameters().put("comment", indexComment);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.metadata;
import java.io.IOException;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Tue Nov 18 00:48:40 2014
@@ -240,6 +240,9 @@ public class ConvertJoinMapJoin implemen
new MapJoinDesc(null, null, joinDesc.getExprs(), null, null,
joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
+ mapJoinDesc.setNullSafes(joinDesc.getNullSafes());
+ mapJoinDesc.setFilterMap(joinDesc.getFilterMap());
+ mapJoinDesc.resetOrder();
}
@SuppressWarnings("unchecked")
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.optimizer;
import java.util.Stack;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.optimizer.metainfo.annotation;
import java.util.ArrayList;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/TraitsUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/TraitsUtil.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/TraitsUtil.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/TraitsUtil.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.optimizer.optiq;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/cost/HiveVolcanoPlanner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/cost/HiveVolcanoPlanner.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/cost/HiveVolcanoPlanner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/cost/HiveVolcanoPlanner.java Tue Nov 18 00:48:40 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.optimizer.optiq.cost;
import org.eigenbase.rel.RelCollationTraitDef;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HivePushFilterPastJoinRule.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HivePushFilterPastJoinRule.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HivePushFilterPastJoinRule.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HivePushFilterPastJoinRule.java Tue Nov 18 00:48:40 2014
@@ -38,114 +38,121 @@ import org.eigenbase.sql.SqlKind;
public abstract class HivePushFilterPastJoinRule extends PushFilterPastJoinRule {
- public static final HivePushFilterPastJoinRule FILTER_ON_JOIN = new HivePushFilterIntoJoinRule();
+ public static final HivePushFilterPastJoinRule FILTER_ON_JOIN = new HivePushFilterIntoJoinRule();
- public static final HivePushFilterPastJoinRule JOIN = new HivePushDownJoinConditionRule();
+ public static final HivePushFilterPastJoinRule JOIN = new HivePushDownJoinConditionRule();
- /**
- * Creates a PushFilterPastJoinRule with an explicit root operand.
- */
- protected HivePushFilterPastJoinRule(RelOptRuleOperand operand, String id,
- boolean smart, RelFactories.FilterFactory filterFactory,
- RelFactories.ProjectFactory projectFactory) {
- super(operand, id, smart, filterFactory, projectFactory);
- }
-
- /**
- * Rule that tries to push filter expressions into a join condition and into
- * the inputs of the join.
- */
- public static class HivePushFilterIntoJoinRule extends
- HivePushFilterPastJoinRule {
- public HivePushFilterIntoJoinRule() {
- super(RelOptRule.operand(FilterRelBase.class,
- RelOptRule.operand(JoinRelBase.class, RelOptRule.any())),
- "HivePushFilterPastJoinRule:filter", true,
- HiveFilterRel.DEFAULT_FILTER_FACTORY,
- HiveProjectRel.DEFAULT_PROJECT_FACTORY);
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- FilterRelBase filter = call.rel(0);
- JoinRelBase join = call.rel(1);
- super.perform(call, filter, join);
- }
- }
-
- public static class HivePushDownJoinConditionRule extends
- HivePushFilterPastJoinRule {
- public HivePushDownJoinConditionRule() {
- super(RelOptRule.operand(JoinRelBase.class, RelOptRule.any()),
- "HivePushFilterPastJoinRule:no-filter", true,
- HiveFilterRel.DEFAULT_FILTER_FACTORY,
- HiveProjectRel.DEFAULT_PROJECT_FACTORY);
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- JoinRelBase join = call.rel(0);
- super.perform(call, null, join);
- }
- }
-
- /*
- * Any predicates pushed down to joinFilters that aren't equality
- * conditions: put them back as aboveFilters because Hive doesn't support
- * not equi join conditions.
- */
- @Override
- protected void validateJoinFilters(List<RexNode> aboveFilters,
- List<RexNode> joinFilters, JoinRelBase join, JoinRelType joinType) {
- if (joinType.equals(JoinRelType.INNER)) {
- ListIterator<RexNode> filterIter = joinFilters.listIterator();
- while (filterIter.hasNext()) {
- RexNode exp = filterIter.next();
- if (exp instanceof RexCall) {
- RexCall c = (RexCall) exp;
- if (c.getOperator().getKind() == SqlKind.EQUALS) {
- boolean validHiveJoinFilter = true;
- for (RexNode rn : c.getOperands()) {
- // NOTE: Hive dis-allows projections from both left
- // &
- // right side
- // of join condition. Example: Hive disallows
- // (r1.x=r2.x)=(r1.y=r2.y) on join condition.
- if (filterRefersToBothSidesOfJoin(rn, join)) {
- validHiveJoinFilter = false;
- break;
- }
- }
- if (validHiveJoinFilter)
- continue;
- }
- }
- aboveFilters.add(exp);
- filterIter.remove();
- }
- }
- }
-
- private boolean filterRefersToBothSidesOfJoin(RexNode filter, JoinRelBase j) {
- boolean refersToBothSides = false;
-
- int joinNoOfProjects = j.getRowType().getFieldCount();
- BitSet filterProjs = new BitSet(joinNoOfProjects);
- BitSet allLeftProjs = new BitSet(joinNoOfProjects);
- BitSet allRightProjs = new BitSet(joinNoOfProjects);
- allLeftProjs.set(0, j.getInput(0).getRowType().getFieldCount(), true);
- allRightProjs.set(j.getInput(0).getRowType().getFieldCount(),
- joinNoOfProjects, true);
-
- InputFinder inputFinder = new InputFinder(filterProjs);
- filter.accept(inputFinder);
-
- if (allLeftProjs.intersects(filterProjs)
- && allRightProjs.intersects(filterProjs))
- refersToBothSides = true;
+ /**
+ * Creates a PushFilterPastJoinRule with an explicit root operand.
+ */
+ protected HivePushFilterPastJoinRule(RelOptRuleOperand operand, String id, boolean smart,
+ RelFactories.FilterFactory filterFactory, RelFactories.ProjectFactory projectFactory) {
+ super(operand, id, smart, filterFactory, projectFactory);
+ }
+
+ /**
+ * Rule that tries to push filter expressions into a join condition and into
+ * the inputs of the join.
+ */
+ public static class HivePushFilterIntoJoinRule extends HivePushFilterPastJoinRule {
+ public HivePushFilterIntoJoinRule() {
+ super(RelOptRule.operand(FilterRelBase.class,
+ RelOptRule.operand(JoinRelBase.class, RelOptRule.any())),
+ "HivePushFilterPastJoinRule:filter", true, HiveFilterRel.DEFAULT_FILTER_FACTORY,
+ HiveProjectRel.DEFAULT_PROJECT_FACTORY);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ FilterRelBase filter = call.rel(0);
+ JoinRelBase join = call.rel(1);
+ super.perform(call, filter, join);
+ }
+ }
+
+ public static class HivePushDownJoinConditionRule extends HivePushFilterPastJoinRule {
+ public HivePushDownJoinConditionRule() {
+ super(RelOptRule.operand(JoinRelBase.class, RelOptRule.any()),
+ "HivePushFilterPastJoinRule:no-filter", true, HiveFilterRel.DEFAULT_FILTER_FACTORY,
+ HiveProjectRel.DEFAULT_PROJECT_FACTORY);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ JoinRelBase join = call.rel(0);
+ super.perform(call, null, join);
+ }
+ }
+
+ /*
+ * Any predicates pushed down to joinFilters that aren't equality conditions:
+ * put them back as aboveFilters because Hive doesn't support not equi join
+ * conditions.
+ */
+ @Override
+ protected void validateJoinFilters(List<RexNode> aboveFilters, List<RexNode> joinFilters,
+ JoinRelBase join, JoinRelType joinType) {
+ if (joinType.equals(JoinRelType.INNER)) {
+ ListIterator<RexNode> filterIter = joinFilters.listIterator();
+ while (filterIter.hasNext()) {
+ RexNode exp = filterIter.next();
+
+ if (exp instanceof RexCall) {
+ RexCall c = (RexCall) exp;
+ boolean validHiveJoinFilter = false;
+
+ if ((c.getOperator().getKind() == SqlKind.EQUALS)) {
+ validHiveJoinFilter = true;
+ for (RexNode rn : c.getOperands()) {
+ // NOTE: Hive dis-allows projections from both left & right side
+ // of join condition. Example: Hive disallows
+ // (r1.x +r2.x)=(r1.y+r2.y) on join condition.
+ if (filterRefersToBothSidesOfJoin(rn, join)) {
+ validHiveJoinFilter = false;
+ break;
+ }
+ }
+ } else if ((c.getOperator().getKind() == SqlKind.LESS_THAN)
+ || (c.getOperator().getKind() == SqlKind.GREATER_THAN)
+ || (c.getOperator().getKind() == SqlKind.LESS_THAN_OR_EQUAL)
+ || (c.getOperator().getKind() == SqlKind.GREATER_THAN_OR_EQUAL)) {
+ validHiveJoinFilter = true;
+ // NOTE: Hive dis-allows projections from both left & right side of
+ // join in in equality condition. Example: Hive disallows (r1.x <
+ // r2.x) on join condition.
+ if (filterRefersToBothSidesOfJoin(c, join)) {
+ validHiveJoinFilter = false;
+ }
+ }
+
+ if (validHiveJoinFilter)
+ continue;
+ }
+
+ aboveFilters.add(exp);
+ filterIter.remove();
+ }
+ }
+ }
+
+ private boolean filterRefersToBothSidesOfJoin(RexNode filter, JoinRelBase j) {
+ boolean refersToBothSides = false;
+
+ int joinNoOfProjects = j.getRowType().getFieldCount();
+ BitSet filterProjs = new BitSet(joinNoOfProjects);
+ BitSet allLeftProjs = new BitSet(joinNoOfProjects);
+ BitSet allRightProjs = new BitSet(joinNoOfProjects);
+ allLeftProjs.set(0, j.getInput(0).getRowType().getFieldCount(), true);
+ allRightProjs.set(j.getInput(0).getRowType().getFieldCount(), joinNoOfProjects, true);
- return refersToBothSides;
- }
+ InputFinder inputFinder = new InputFinder(filterProjs);
+ filter.accept(inputFinder);
+
+ if (allLeftProjs.intersects(filterProjs) && allRightProjs.intersects(filterProjs))
+ refersToBothSides = true;
+
+ return refersToBothSides;
+ }
}
// End PushFilterPastJoinRule.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java Tue Nov 18 00:48:40 2014
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.ql.optimizer.optiq.stats;
import java.util.BitSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.hadoop.hive.ql.optimizer.optiq.RelOptHiveTable;
import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveTableScanRel;
@@ -32,6 +35,10 @@ import org.eigenbase.rex.RexInputRef;
import org.eigenbase.rex.RexNode;
import org.eigenbase.rex.RexVisitorImpl;
import org.eigenbase.sql.SqlKind;
+import org.eigenbase.sql.SqlOperator;
+import org.eigenbase.sql.type.SqlTypeUtil;
+
+import com.google.common.collect.Sets;
public class FilterSelectivityEstimator extends RexVisitorImpl<Double> {
private final RelNode childRel;
@@ -61,7 +68,7 @@ public class FilterSelectivityEstimator
}
Double selectivity = null;
- SqlKind op = call.getKind();
+ SqlKind op = getOp(call);
switch (op) {
case AND: {
@@ -74,6 +81,7 @@ public class FilterSelectivityEstimator
break;
}
+ case NOT:
case NOT_EQUALS: {
selectivity = computeNotEqualitySelectivity(call);
break;
@@ -88,7 +96,16 @@ public class FilterSelectivityEstimator
}
case IN: {
- selectivity = ((double) 1 / ((double) call.operands.size()));
+ // TODO: 1) check for duplicates 2) We assume in clause values to be
+ // present in NDV which may not be correct (Range check can find it) 3) We
+ // assume values in NDV set is uniformly distributed over col values
+ // (account for skewness - histogram).
+ selectivity = computeFunctionSelectivity(call) * (call.operands.size() - 1);
+ if (selectivity <= 0.0) {
+ selectivity = 0.10;
+ } else if (selectivity >= 1.0) {
+ selectivity = 1.0;
+ }
break;
}
@@ -152,18 +169,19 @@ public class FilterSelectivityEstimator
}
tmpCardinality = childCardinality * tmpSelectivity;
- if (tmpCardinality > 1)
+ if (tmpCardinality > 1 && tmpCardinality < childCardinality) {
tmpSelectivity = (1 - tmpCardinality / childCardinality);
- else
+ } else {
tmpSelectivity = 1.0;
+ }
selectivity *= tmpSelectivity;
}
- if (selectivity > 1)
- return (1 - selectivity);
- else
- return 1.0;
+ if (selectivity < 0.0)
+ selectivity = 0.0;
+
+ return (1 - selectivity);
}
/**
@@ -225,4 +243,19 @@ public class FilterSelectivityEstimator
}
return false;
}
+
+ private SqlKind getOp(RexCall call) {
+ SqlKind op = call.getKind();
+
+ if (call.getKind().equals(SqlKind.OTHER_FUNCTION)
+ && SqlTypeUtil.inBooleanFamily(call.getType())) {
+ SqlOperator sqlOp = call.getOperator();
+ String opName = (sqlOp != null) ? sqlOp.getName() : "";
+ if (opName.equalsIgnoreCase("in")) {
+ op = SqlKind.IN;
+ }
+ }
+
+ return op;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdUniqueKeys.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdUniqueKeys.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdUniqueKeys.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdUniqueKeys.java Tue Nov 18 00:48:40 2014
@@ -38,6 +38,7 @@ import org.eigenbase.rel.metadata.Metada
import org.eigenbase.rel.metadata.ReflectiveRelMetadataProvider;
import org.eigenbase.rel.metadata.RelMdUniqueKeys;
import org.eigenbase.rel.metadata.RelMetadataProvider;
+import org.eigenbase.relopt.RelOptUtil;
import org.eigenbase.relopt.hep.HepRelVertex;
import org.eigenbase.rex.RexInputRef;
import org.eigenbase.rex.RexNode;
@@ -100,7 +101,7 @@ public class HiveRelMdUniqueKeys {
cStat.getRange().minValue != null) {
double r = cStat.getRange().maxValue.doubleValue() -
cStat.getRange().minValue.doubleValue() + 1;
- isKey = (numRows == r);
+ isKey = (Math.abs(numRows - r) < RelOptUtil.EPSILON);
}
if ( isKey ) {
BitSet key = new BitSet();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java Tue Nov 18 00:48:40 2014
@@ -278,6 +278,7 @@ public class SqlFunctionConverter {
registerFunction(">=", SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
hToken(HiveParser.GREATERTHANOREQUALTO, ">="));
registerFunction("!", SqlStdOperatorTable.NOT, hToken(HiveParser.KW_NOT, "not"));
+ registerFunction("<>", SqlStdOperatorTable.NOT_EQUALS, hToken(HiveParser.NOTEQUAL, "<>"));
}
private void registerFunction(String name, SqlOperator optiqFn, HiveToken hiveToken) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java Tue Nov 18 00:48:40 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
public class PartExprEvalUtils {
@@ -103,11 +104,13 @@ public class PartExprEvalUtils {
}
static synchronized public ObjectPair<PrimitiveObjectInspector, ExprNodeEvaluator> prepareExpr(
- ExprNodeGenericFuncDesc expr, List<String> partNames) throws HiveException {
+ ExprNodeGenericFuncDesc expr, List<String> partNames,
+ List<PrimitiveTypeInfo> partColumnTypeInfos) throws HiveException {
// Create the row object
List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>();
for (int i = 0; i < partNames.size(); i++) {
- partObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+ partObjectInspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
+ partColumnTypeInfos.get(i)));
}
StructObjectInspector objectInspector = ObjectInspectorFactory
.getStandardStructObjectInspector(partNames, partObjectInspectors);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java?rev=1640263&r1=1640262&r2=1640263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java Tue Nov 18 00:48:40 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
/**
* The basic implementation of PartitionExpressionProxy that uses ql package classes.
@@ -40,13 +41,14 @@ public class PartitionExpressionForMetas
}
@Override
- public boolean filterPartitionsByExpr(List<String> columnNames, byte[] exprBytes,
+ public boolean filterPartitionsByExpr(List<String> partColumnNames,
+ List<PrimitiveTypeInfo> partColumnTypeInfos, byte[] exprBytes,
String defaultPartitionName, List<String> partitionNames) throws MetaException {
ExprNodeGenericFuncDesc expr = deserializeExpr(exprBytes);
try {
long startTime = System.nanoTime(), len = partitionNames.size();
boolean result = PartitionPruner.prunePartitionNames(
- columnNames, expr, defaultPartitionName, partitionNames);
+ partColumnNames, partColumnTypeInfos, expr, defaultPartitionName, partitionNames);
double timeMs = (System.nanoTime() - startTime) / 1000000.0;
LOG.debug("Pruning " + len + " partition names took " + timeMs + "ms");
return result;