You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/11/05 08:01:58 UTC
svn commit: r1538880 [4/46] - in /hive/branches/tez: ./ ant/
ant/src/org/apache/hadoop/hive/ant/ beeline/
beeline/src/java/org/apache/hive/beeline/ beeline/src/main/
beeline/src/test/org/apache/hive/beeline/src/test/ cli/ common/
common/src/java/conf/ ...
Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java Tue Nov 5 07:01:32 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -195,6 +196,12 @@ public class HiveAlterHandler implements
msdb.alterPartition(dbname, name, part.getValues(), part);
}
}
+ } else if (MetaStoreUtils.requireCalStats(hiveConf, null, null, newt) &&
+ (newt.getPartitionKeysSize() == 0)) {
+ Database db = msdb.getDatabase(newt.getDbName());
+ // Update table stats. For partitioned table, we update stats in
+ // alterPartition()
+ MetaStoreUtils.updateUnpartitionedTableStatsFast(db, newt, wh, false, true);
}
// now finally call alter table
msdb.alterTable(dbname, name, newt);
@@ -254,10 +261,10 @@ public class HiveAlterHandler implements
Path destPath = null;
FileSystem srcFs = null;
FileSystem destFs = null;
- Table tbl = null;
Partition oldPart = null;
String oldPartLoc = null;
String newPartLoc = null;
+
// Set DDL time to now if not specified
if (new_part.getParameters() == null ||
new_part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null ||
@@ -265,10 +272,15 @@ public class HiveAlterHandler implements
new_part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
.currentTimeMillis() / 1000));
}
+
+ Table tbl = msdb.getTable(dbname, name);
//alter partition
if (part_vals == null || part_vals.size() == 0) {
try {
oldPart = msdb.getPartition(dbname, name, new_part.getValues());
+ if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
+ MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
+ }
msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
} catch (InvalidObjectException e) {
throw new InvalidOperationException("alter is not possible");
@@ -299,7 +311,6 @@ public class HiveAlterHandler implements
throw new AlreadyExistsException("Partition already exists:" + dbname + "." + name + "." +
new_part.getValues());
}
- tbl = msdb.getTable(dbname, name);
if (tbl == null) {
throw new InvalidObjectException(
"Unable to rename partition because table or database do not exist");
@@ -351,6 +362,9 @@ public class HiveAlterHandler implements
+ tbl.getTableName() + " " + new_part.getValues());
}
new_part.getSd().setLocation(newPartLoc);
+ if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
+ MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
+ }
msdb.alterPartition(dbname, name, part_vals, new_part);
}
}
@@ -399,6 +413,7 @@ public class HiveAlterHandler implements
MetaException {
List<Partition> oldParts = new ArrayList<Partition>();
List<List<String>> partValsList = new ArrayList<List<String>>();
+ Table tbl = msdb.getTable(dbname, name);
try {
for (Partition tmpPart: new_parts) {
// Set DDL time to now if not specified
@@ -408,9 +423,14 @@ public class HiveAlterHandler implements
tmpPart.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
.currentTimeMillis() / 1000));
}
+
Partition oldTmpPart = msdb.getPartition(dbname, name, tmpPart.getValues());
oldParts.add(oldTmpPart);
partValsList.add(tmpPart.getValues());
+
+ if (MetaStoreUtils.requireCalStats(hiveConf, oldTmpPart, tmpPart, tbl)) {
+ MetaStoreUtils.updatePartitionStatsFast(tmpPart, wh, false, true);
+ }
}
msdb.alterPartitions(dbname, name, partValsList, new_parts);
} catch (InvalidObjectException e) {
Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Tue Nov 5 07:01:32 2013
@@ -1029,7 +1029,8 @@ public class HiveMetaStore extends Thrif
ms.openTransaction();
- if (ms.getDatabase(tbl.getDbName()) == null) {
+ Database db = ms.getDatabase(tbl.getDbName());
+ if (db == null) {
throw new NoSuchObjectException("The database " + tbl.getDbName() + " does not exist");
}
@@ -1063,6 +1064,14 @@ public class HiveMetaStore extends Thrif
madeDir = true;
}
}
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) &&
+ !MetaStoreUtils.isView(tbl)) {
+ if (tbl.getPartitionKeysSize() == 0) { // Unpartitioned table
+ MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, madeDir);
+ } else { // Partitioned table with no partitions.
+ MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, true);
+ }
+ }
// set create time
long time = System.currentTimeMillis() / 1000;
@@ -1540,6 +1549,11 @@ public class HiveMetaStore extends Thrif
part.setCreateTime((int) time);
part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) &&
+ !MetaStoreUtils.isView(tbl)) {
+ MetaStoreUtils.updatePartitionStatsFast(part, wh, madeDir);
+ }
+
success = ms.addPartition(part);
if (success) {
success = ms.commitTransaction();
@@ -1760,6 +1774,11 @@ public class HiveMetaStore extends Thrif
}
}
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) &&
+ !MetaStoreUtils.isView(tbl)) {
+ MetaStoreUtils.updatePartitionStatsFast(part, wh, madeDir);
+ }
+
// set create time
long time = System.currentTimeMillis() / 1000;
part.setCreateTime((int) time);
Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java Tue Nov 5 07:01:32 2013
@@ -41,13 +41,17 @@ import org.apache.commons.lang.StringUti
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -140,6 +144,166 @@ public class MetaStoreUtils {
}
/**
+ * @param partParams
+ * @return True if the passed Parameters Map contains values for all "Fast Stats".
+ */
+ public static boolean containsAllFastStats(Map<String, String> partParams) {
+ List<String> fastStats = StatsSetupConst.getStatsFastCollection();
+ for (String stat : fastStats) {
+ if (!partParams.containsKey(stat)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean updateUnpartitionedTableStatsFast(Database db, Table tbl, Warehouse wh)
+ throws MetaException {
+ return updateUnpartitionedTableStatsFast(db, tbl, wh, false, false);
+ }
+
+ public static boolean updateUnpartitionedTableStatsFast(Database db, Table tbl, Warehouse wh,
+ boolean madeDir) throws MetaException {
+ return updateUnpartitionedTableStatsFast(db, tbl, wh, madeDir, false);
+ }
+
+ /**
+ * Updates the numFiles and totalSize parameters for the passed unpartitioned Table by querying
+ * the warehouse if the passed Table does not already have values for these parameters.
+ * @param db
+ * @param tbl
+ * @param wh
+ * @param newDir if true, the directory was just created and can be assumed to be empty
+ * @param forceRecompute Recompute stats even if the passed Table already has
+ * these parameters set
+ * @return true if the stats were updated, false otherwise
+ */
+ public static boolean updateUnpartitionedTableStatsFast(Database db, Table tbl, Warehouse wh,
+ boolean newDir, boolean forceRecompute) throws MetaException {
+ Map<String,String> params = tbl.getParameters();
+ boolean updated = false;
+ if (forceRecompute ||
+ params == null ||
+ !containsAllFastStats(params)) {
+ if (params == null) {
+ params = new HashMap<String,String>();
+ }
+ if (!newDir) {
+ // The table location already exists and may contain data.
+ // Let's try to populate those stats that don't require full scan.
+ LOG.info("Updating table stats fast for " + tbl.getTableName());
+ FileStatus[] fileStatus = wh.getFileStatusesForUnpartitionedTable(db, tbl);
+ params.put(StatsSetupConst.NUM_FILES, Integer.toString(fileStatus.length));
+ long tableSize = 0L;
+ for (FileStatus status : fileStatus) {
+ tableSize += status.getLen();
+ }
+ params.put(StatsSetupConst.TOTAL_SIZE, Long.toString(tableSize));
+ LOG.info("Updated size of table " + tbl.getTableName() +" to "+ Long.toString(tableSize));
+ if (params.containsKey(StatsSetupConst.ROW_COUNT) ||
+ params.containsKey(StatsSetupConst.RAW_DATA_SIZE)) {
+ // TODO: Add a MetaStore flag indicating accuracy of these stats and update it here.
+ }
+ }
+ tbl.setParameters(params);
+ updated = true;
+ }
+ return updated;
+ }
+
+ // check if stats need to be (re)calculated
+ public static boolean requireCalStats(Configuration hiveConf, Partition oldPart,
+ Partition newPart, Table tbl) {
+
+ if (!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
+ return false;
+ }
+
+ if (MetaStoreUtils.isView(tbl)) {
+ return false;
+ }
+
+ if (oldPart == null && newPart == null) {
+ return true;
+ }
+
+ // requires to calculate stats if new partition doesn't have it
+ if ((newPart == null) || (newPart.getParameters() == null)
+ || !containsAllFastStats(newPart.getParameters())) {
+ return true;
+ }
+
+ // requires to calculate stats if new and old have different fast stats
+ if ((oldPart != null) && (oldPart.getParameters() != null)) {
+ for (String stat : StatsSetupConst.getStatsFastCollection()) {
+ if (oldPart.getParameters().containsKey(stat)) {
+ Long oldStat = Long.parseLong(oldPart.getParameters().get(stat));
+ Long newStat = Long.parseLong(newPart.getParameters().get(stat));
+ if (oldStat != newStat) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ public static boolean updatePartitionStatsFast(Partition part, Warehouse wh)
+ throws MetaException {
+ return updatePartitionStatsFast(part, wh, false, false);
+ }
+
+ public static boolean updatePartitionStatsFast(Partition part, Warehouse wh, boolean madeDir)
+ throws MetaException {
+ return updatePartitionStatsFast(part, wh, madeDir, false);
+ }
+
+ /**
+ * Updates the numFiles and totalSize parameters for the passed Partition by querying
+ * the warehouse if the passed Partition does not already have values for these parameters.
+ * @param part
+ * @param wh
+ * @param madeDir if true, the directory was just created and can be assumed to be empty
+ * @param forceRecompute Recompute stats even if the passed Partition already has
+ * these parameters set
+ * @return true if the stats were updated, false otherwise
+ */
+ public static boolean updatePartitionStatsFast(Partition part, Warehouse wh,
+ boolean madeDir, boolean forceRecompute) throws MetaException {
+ Map<String,String> params = part.getParameters();
+ boolean updated = false;
+ if (forceRecompute ||
+ params == null ||
+ !containsAllFastStats(params)) {
+ if (params == null) {
+ params = new HashMap<String,String>();
+ }
+ if (!madeDir) {
+ // The partitition location already existed and may contain data. Lets try to
+ // populate those statistics that don't require a full scan of the data.
+ LOG.warn("Updating partition stats fast for: " + part.getTableName());
+ FileStatus[] fileStatus = wh.getFileStatusesForPartition(part);
+ params.put(StatsSetupConst.NUM_FILES, Integer.toString(fileStatus.length));
+ long partSize = 0L;
+ for (int i = 0; i < fileStatus.length; i++) {
+ partSize += fileStatus[i].getLen();
+ }
+ params.put(StatsSetupConst.TOTAL_SIZE, Long.toString(partSize));
+ LOG.warn("Updated size to " + Long.toString(partSize));
+ if (params.containsKey(StatsSetupConst.ROW_COUNT) ||
+ params.containsKey(StatsSetupConst.RAW_DATA_SIZE)) {
+ // The accuracy of these "collectable" stats at this point is suspect unless we know that
+ // StatsTask was just run before this MetaStore call and populated them.
+ // TODO: Add a MetaStore flag indicating accuracy of these stats and update it here.
+ }
+ }
+ part.setParameters(params);
+ updated = true;
+ }
+ return updated;
+ }
+
+ /**
* getDeserializer
*
* Get the Deserializer for a table given its name and properties.
@@ -1122,6 +1286,13 @@ public class MetaStoreUtils {
return filter.toString();
}
+ public static boolean isView(Table table) {
+ if (table == null) {
+ return false;
+ }
+ return TableType.VIRTUAL_VIEW.toString().equals(table.getTableType());
+ }
+
/**
* create listener instances as per the configuration.
*
Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java Tue Nov 5 07:01:32 2013
@@ -45,11 +45,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
@@ -496,6 +500,63 @@ public class Warehouse {
}
/**
+ * @param partn
+ * @return array of FileStatus objects corresponding to the files making up the passed partition
+ */
+ public FileStatus[] getFileStatusesForPartition(Partition partn)
+ throws MetaException {
+ try {
+ Path path = new Path(partn.getSd().getLocation());
+ FileSystem fileSys = path.getFileSystem(conf);
+ /* consider sub-directory created from list bucketing. */
+ int listBucketingDepth = calculateListBucketingDMLDepth(partn);
+ return HiveStatsUtils.getFileStatusRecurse(path, (1 + listBucketingDepth), fileSys);
+ } catch (IOException ioe) {
+ MetaStoreUtils.logAndThrowMetaException(ioe);
+ }
+ return null;
+ }
+
+ /**
+ * List bucketing will introduce sub-directories.
+ * calculate it here in order to go to the leaf directory
+ * so that we can count right number of files.
+ * @param partn
+ * @return
+ */
+ private static int calculateListBucketingDMLDepth(Partition partn) {
+ // list bucketing will introduce more files
+ int listBucketingDepth = 0;
+ SkewedInfo skewedInfo = partn.getSd().getSkewedInfo();
+ if ((skewedInfo != null) && (skewedInfo.getSkewedColNames() != null)
+ && (skewedInfo.getSkewedColNames().size() > 0)
+ && (skewedInfo.getSkewedColValues() != null)
+ && (skewedInfo.getSkewedColValues().size() > 0)
+ && (skewedInfo.getSkewedColValueLocationMaps() != null)
+ && (skewedInfo.getSkewedColValueLocationMaps().size() > 0)) {
+ listBucketingDepth = skewedInfo.getSkewedColNames().size();
+ }
+ return listBucketingDepth;
+ }
+
+ /**
+ * @param table
+ * @return array of FileStatus objects corresponding to the files making up the passed
+ * unpartitioned table
+ */
+ public FileStatus[] getFileStatusesForUnpartitionedTable(Database db, Table table)
+ throws MetaException {
+ Path tablePath = getTablePath(db, table.getTableName());
+ try {
+ FileSystem fileSys = tablePath.getFileSystem(conf);
+ return HiveStatsUtils.getFileStatusRecurse(tablePath, 1, fileSys);
+ } catch (IOException ioe) {
+ MetaStoreUtils.logAndThrowMetaException(ioe);
+ }
+ return null;
+ }
+
+ /**
* Makes a valid partition name.
* @param partCols The partition columns
* @param vals The partition values
Modified: hive/branches/tez/odbc/Makefile
URL: http://svn.apache.org/viewvc/hive/branches/tez/odbc/Makefile?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/odbc/Makefile (original)
+++ hive/branches/tez/odbc/Makefile Tue Nov 5 07:01:32 2013
@@ -53,7 +53,7 @@ SHELL = /bin/sh
LIBTOOL = $(SHELL) /usr/bin/libtool
LINK = ln -sf
-BUILD_DIR = $(HIVE_ROOT)/build
+BUILD_DIR = $(BASE_DIR)/target
ODBC_BUILD_DIR = $(BUILD_DIR)/odbc
OBJ_SERVICE_BUILD_DIR = $(BUILD_DIR)/service/objs
OBJ_QL_BUILD_DIR = $(BUILD_DIR)/ql/objs
@@ -73,7 +73,7 @@ SHLIB_VERSION = 1.0.0
SO_LINK_NAME = lib$(LIB_NAME).so
SO_NAME = $(SO_LINK_NAME).$(SHLIB_VERSION)
SO_LINK_TARGET = $(LIB_ODBC_BUILD_DIR)/$(SO_LINK_NAME)
-SO_TARGET = $(LIB_ODBC_BUILD_DIR)/$(SO_NAME)
+SO_TARGET = $(LIB_ODBC_BUILD_DIR)/$(SO_NAME)
SO_INSTALL_LINK_TARGET = $(INSTALL_LIB_PATH)/$(SO_LINK_NAME)
SO_INSTALL_TARGET = $(INSTALL_LIB_PATH)/$(SO_NAME)
AR_NAME = lib$(LIB_NAME).a
Copied: hive/branches/tez/pom.xml (from r1538724, hive/trunk/pom.xml)
URL: http://svn.apache.org/viewvc/hive/branches/tez/pom.xml?p2=hive/branches/tez/pom.xml&p1=hive/trunk/pom.xml&r1=1538724&r2=1538880&rev=1538880&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/branches/tez/pom.xml Tue Nov 5 07:01:32 2013
@@ -76,6 +76,7 @@
<commons-httpclient.version>3.0.1</commons-httpclient.version>
<commons-io.version>2.4</commons-io.version>
<commons-lang.version>2.4</commons-lang.version>
+ <commons-lang3.version>3.1</commons-lang3.version>
<commons-logging.version>1.0.4</commons-logging.version>
<commons-pool.version>1.5.4</commons-pool.version>
<derby.version>10.4.2.0</derby.version>
@@ -83,7 +84,7 @@
<groovy.version>2.1.6</groovy.version>
<hadoop-20.version>0.20.2</hadoop-20.version>
<hadoop-20S.version>1.1.2</hadoop-20S.version>
- <hadoop-23.version>2.0.5-alpha</hadoop-23.version>
+ <hadoop-23.version>2.2.0</hadoop-23.version>
<hbase.version>0.94.6.1</hbase.version>
<jackson.version>1.9.2</jackson.version>
<javaewah.version>0.3.2</javaewah.version>
@@ -109,6 +110,7 @@
<rat.version>0.8</rat.version>
<slf4j.version>1.6.1</slf4j.version>
<ST4.version>4.0.4</ST4.version>
+ <tez.version>0.2.0-SNAPSHOT</tez.version>
<tempus-fugit.version>1.1</tempus-fugit.version>
<snappy.version>0.2</snappy.version>
<wadl-resourcedoc-doclet.version>1.4</wadl-resourcedoc-doclet.version>
Copied: hive/branches/tez/ql/pom.xml (from r1538724, hive/trunk/ql/pom.xml)
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/pom.xml?p2=hive/branches/tez/ql/pom.xml&p1=hive/trunk/ql/pom.xml&r1=1538724&r2=1538880&rev=1538880&view=diff
==============================================================================
--- hive/trunk/ql/pom.xml (original)
+++ hive/branches/tez/ql/pom.xml Tue Nov 5 07:01:32 2013
@@ -70,6 +70,11 @@
<version>${commons-io.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>${commons-logging.version}</version>
@@ -218,6 +223,59 @@
<version>${hadoop-23.version}</version>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+ <dependency >
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-mapreduce</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop-23.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <version>${hadoop-23.version}</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <version>${hadoop-23.version}</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop-23.version}</version>
+ <optional>true</optional>
+ </dependency>
</dependencies>
</profile>
<profile>
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Tue Nov 5 07:01:32 2013
@@ -498,6 +498,13 @@ public class FetchOperator implements Se
* Currently only used by FetchTask.
**/
public boolean pushRow() throws IOException, HiveException {
+ if(work.getRowsComputedUsingStats() != null) {
+ for (List<Object> row : work.getRowsComputedUsingStats()) {
+ operator.process(row, 0);
+ }
+ operator.flush();
+ return true;
+ }
InspectableObject row = getNextRow();
if (row != null) {
pushRow(row);
@@ -609,6 +616,9 @@ public class FetchOperator implements Se
* returns output ObjectInspector, never null
*/
public ObjectInspector getOutputObjectInspector() throws HiveException {
+ if(null != work.getStatRowOI()) {
+ return work.getStatRowOI();
+ }
try {
if (work.isNotPartitioned()) {
return getRowInspectorFromTable(work.getTblDesc());
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Nov 5 07:01:32 2013
@@ -34,6 +34,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.FSRecordWriter.StatsProvidingRecordWriter;
@@ -51,7 +53,6 @@ import org.apache.hadoop.hive.ql.plan.Pl
import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
-import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.Serializer;
@@ -836,7 +837,7 @@ public class FileSinkOperator extends Te
if (conf.isLinkedFileSink()) {
level++;
}
- FileStatus[] status = Utilities.getFileStatusRecurse(tmpPath, level, fs);
+ FileStatus[] status = HiveStatsUtils.getFileStatusRecurse(tmpPath, level, fs);
sb.append("Sample of ")
.append(Math.min(status.length, 100))
.append(" partitions created under ")
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Tue Nov 5 07:01:32 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -194,11 +195,11 @@ public class MoveTask extends Task<MoveW
}
}
-
@Override
public int execute(DriverContext driverContext) {
try {
+
// Do any hive related operations like moving tables and files
// to appropriate locations
LoadFileDesc lfd = work.getLoadFileWork();
@@ -460,7 +461,7 @@ public class MoveTask extends Task<MoveW
boolean updateBucketCols = false;
if (bucketCols != null) {
FileSystem fileSys = partn.getPartitionPath().getFileSystem(conf);
- FileStatus[] fileStatus = Utilities.getFileStatusRecurse(
+ FileStatus[] fileStatus = HiveStatsUtils.getFileStatusRecurse(
partn.getPartitionPath(), 1, fileSys);
// Verify the number of buckets equals the number of files
// This will not hold for dynamic partitions where not every reducer produced a file for
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Nov 5 07:01:32 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
@@ -90,9 +91,8 @@ public class ReduceSinkOperator extends
return inputAlias;
}
- // picks topN K:V pairs from input. can be null
- private transient TopNHash reducerHash;
-
+ // picks topN K:V pairs from input.
+ protected transient TopNHash reducerHash = new TopNHash();
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
@@ -136,7 +136,11 @@ public class ReduceSinkOperator extends
.newInstance();
valueSerializer.initialize(null, valueTableDesc.getProperties());
- reducerHash = createTopKHash();
+ int limit = conf.getTopN();
+ float memUsage = conf.getTopNMemoryUsage();
+ if (limit >= 0 && memUsage > 0) {
+ reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
+ }
firstRow = true;
initializeChildren(hconf);
@@ -146,26 +150,8 @@ public class ReduceSinkOperator extends
}
}
- private TopNHash createTopKHash() {
- int limit = conf.getTopN();
- float percent = conf.getTopNMemoryUsage();
- if (limit < 0 || percent <= 0) {
- return null;
- }
- if (limit == 0) {
- return TopNHash.create0();
- }
- // limit * 64 : compensation of arrays for key/value/hashcodes
- long threshold = (long) (percent * Runtime.getRuntime().maxMemory()) - limit * 64;
- if (threshold < 0) {
- return null;
- }
- return TopNHash.create(conf.isMapGroupBy(), limit, threshold, this);
- }
-
transient InspectableObject tempInspectableObject = new InspectableObject();
protected transient HiveKey keyWritable = new HiveKey();
- protected transient Writable value;
transient StructObjectInspector keyObjectInspector;
transient StructObjectInspector valueObjectInspector;
@@ -214,6 +200,7 @@ public class ReduceSinkOperator extends
if (outputColNames.size() > length) {
// union keys
+ assert distinctColIndices != null;
List<ObjectInspector> uois = new ArrayList<ObjectInspector>();
for (List<Integer> distinctCols : distinctColIndices) {
List<String> names = new ArrayList<String>();
@@ -240,6 +227,9 @@ public class ReduceSinkOperator extends
ObjectInspector rowInspector = inputObjInspectors[tag];
if (firstRow) {
firstRow = false;
+ // TODO: this is fishy - we init object inspectors based on first tag. We
+ // should either init for each tag, or if rowInspector doesn't really
+ // matter, then we can create this in ctor and get rid of firstRow.
keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
distinctColIndices,
conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
@@ -253,32 +243,6 @@ public class ReduceSinkOperator extends
cachedValues = new Object[valueEval.length];
}
- // Evaluate the HashCode
- int keyHashCode = 0;
- if (partitionEval.length == 0) {
- // If no partition cols, just distribute the data uniformly to provide
- // better
- // load balance. If the requirement is to have a single reducer, we
- // should set
- // the number of reducers to 1.
- // Use a constant seed to make the code deterministic.
- if (random == null) {
- random = new Random(12345);
- }
- keyHashCode = random.nextInt();
- } else {
- for (int i = 0; i < partitionEval.length; i++) {
- Object o = partitionEval[i].evaluate(row);
- keyHashCode = keyHashCode * 31
- + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
- }
- }
-
- // Evaluate the value
- for (int i = 0; i < valueEval.length; i++) {
- cachedValues[i] = valueEval[i].evaluate(row);
- }
-
// Evaluate the keys
for (int i = 0; i < numDistributionKeys; i++) {
cachedKeys[0][i] = keyEval[i].evaluate(row);
@@ -303,64 +267,21 @@ public class ReduceSinkOperator extends
}
}
- BytesWritable value = null;
- // Serialize the keys and append the tag
for (int i = 0; i < cachedKeys.length; i++) {
- if (keyIsText) {
- Text key = (Text) keySerializer.serialize(cachedKeys[i],
- keyObjectInspector);
- if (tag == -1) {
- keyWritable.set(key.getBytes(), 0, key.getLength());
- } else {
- int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
- System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
- }
- } else {
- // Must be BytesWritable
- BytesWritable key = (BytesWritable) keySerializer.serialize(
- cachedKeys[i], keyObjectInspector);
- if (tag == -1) {
- keyWritable.set(key.getBytes(), 0, key.getLength());
- } else {
- int keyLength = key.getLength();
- keyWritable.setSize(keyLength + 1);
- System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
- }
- }
- keyWritable.setHashCode(keyHashCode);
-
- if (reducerHash == null) {
- if (null != out) {
- collect(keyWritable, value = getValue(row, value));
- }
- } else {
- int index = reducerHash.indexOf(keyWritable);
- if (index == TopNHash.EXCLUDED) {
- continue;
- }
- value = getValue(row, value);
- if (index >= 0) {
- reducerHash.set(index, value);
- } else {
- if (index == TopNHash.FORWARD) {
- collect(keyWritable, value);
- } else if (index == TopNHash.FLUSH) {
- LOG.info("Top-N hash is flushed");
- reducerHash.flush();
- // we can now retry adding key/value into hash, which is flushed.
- // but for simplicity, just forward them
- collect(keyWritable, value);
- } else if (index == TopNHash.DISABLE) {
- LOG.info("Top-N hash is disabled");
- reducerHash.flush();
- collect(keyWritable, value);
- reducerHash = null;
- }
- }
+ // Serialize the keys and append the tag
+ Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
+ setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
+ int topNIndex = reducerHash.tryStoreKey(keyWritable);
+ if (TopNHash.EXCLUDED == topNIndex) continue;
+ int keyHashCode = computeHashCode(row);
+ BytesWritable valueWritable = getValue(row);
+ if (TopNHash.FORWARD == topNIndex) {
+ keyWritable.setHashCode(keyHashCode);
+ collect(keyWritable, valueWritable);
+ continue;
}
+ assert topNIndex >= 0;
+ reducerHash.storeValue(topNIndex, valueWritable, keyHashCode, false);
}
} catch (HiveException e) {
throw e;
@@ -369,10 +290,58 @@ public class ReduceSinkOperator extends
}
}
- public void collect(BytesWritable key, BytesWritable value) throws IOException {
+ private int computeHashCode(Object row) throws HiveException {
+ // Evaluate the HashCode
+ int keyHashCode = 0;
+ if (partitionEval.length == 0) {
+ // If no partition cols, just distribute the data uniformly to provide
+ // better
+ // load balance. If the requirement is to have a single reducer, we
+ // should set
+ // the number of reducers to 1.
+ // Use a constant seed to make the code deterministic.
+ if (random == null) {
+ random = new Random(12345);
+ }
+ keyHashCode = random.nextInt();
+ } else {
+ for (int i = 0; i < partitionEval.length; i++) {
+ Object o = partitionEval[i].evaluate(row);
+ keyHashCode = keyHashCode * 31
+ + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+ }
+ }
+ return keyHashCode;
+ }
+
+ protected void setKeyWritable(BinaryComparable key, int tag) {
+ if (tag == -1) {
+ keyWritable.set(key.getBytes(), 0, key.getLength());
+ } else {
+ int keyLength = key.getLength();
+ keyWritable.setSize(keyLength + 1);
+ System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
+ keyWritable.get()[keyLength] = tagByte[0];
+ }
+ }
+
+ public void collect(byte[] key, byte[] value, int hash) throws IOException {
+ HiveKey keyWritable = new HiveKey(key, hash);
+ BytesWritable valueWritable = new BytesWritable(value);
+ collect(keyWritable, valueWritable);
+ }
+
+ protected void collect(byte[] key, Writable valueWritable, int hash) throws IOException {
+ HiveKey keyWritable = new HiveKey(key, hash);
+ collect(keyWritable, valueWritable);
+ }
+
+ protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException {
// Since this is a terminal operator, update counters explicitly -
// forward is not called
- out.collect(key, value);
+ if (null != out) {
+ out.collect(keyWritable, valueWritable);
+ }
if (++outputRows % 1000 == 0) {
if (counterNameToEnum != null) {
incrCounter(numOutputRowsCntr, outputRows);
@@ -382,11 +351,7 @@ public class ReduceSinkOperator extends
}
}
- // evaluate value lazily
- private BytesWritable getValue(Object row, BytesWritable value) throws Exception {
- if (value != null) {
- return value;
- }
+ private BytesWritable getValue(Object row) throws Exception {
// Evaluate the value
for (int i = 0; i < valueEval.length; i++) {
cachedValues[i] = valueEval[i].evaluate(row);
@@ -397,16 +362,9 @@ public class ReduceSinkOperator extends
@Override
protected void closeOp(boolean abort) throws HiveException {
- if (!abort && reducerHash != null) {
- try {
- reducerHash.flush();
- } catch (IOException e) {
- throw new HiveException(e);
- } finally {
- reducerHash = null;
- }
+ if (!abort) {
+ reducerHash.flush();
}
- reducerHash = null;
super.closeOp(abort);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Tue Nov 5 07:01:32 2013
@@ -19,23 +19,19 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.io.Serializable;
-import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.ql.DriverContext;
@@ -43,7 +39,6 @@ import org.apache.hadoop.hive.ql.ErrorMs
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@ -52,12 +47,16 @@ import org.apache.hadoop.hive.ql.plan.ap
import org.apache.hadoop.hive.ql.stats.StatsAggregator;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
-import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.util.StringUtils;
/**
- * StatsTask implementation.
+ * StatsTask implementation. StatsTask mainly deals with "collectable" stats. These are
+ * stats that require data scanning and are collected during query execution (unless the user
+ * explicitly requests data scanning just for the purpose of stats computation using the "ANALYZE"
+ * command. All other stats are computed directly by the MetaStore. The rationale being that the
+ * MetaStore layer covers all Thrift calls and provides better guarantees about the accuracy of
+ * those stats.
**/
public class StatsTask extends Task<StatsWork> implements Serializable {
@@ -67,25 +66,8 @@ public class StatsTask extends Task<Stat
private Table table;
private List<LinkedHashMap<String, String>> dpPartSpecs;
- private static final List<String> supportedStats = new ArrayList<String>();
- private static final List<String> collectableStats = new ArrayList<String>();
- private static final Map<String, String> nameMapping = new HashMap<String, String>();
- static {
- // supported statistics
- supportedStats.add(StatsSetupConst.NUM_FILES);
- supportedStats.add(StatsSetupConst.ROW_COUNT);
- supportedStats.add(StatsSetupConst.TOTAL_SIZE);
- supportedStats.add(StatsSetupConst.RAW_DATA_SIZE);
-
- // statistics that need to be collected throughout the execution
- collectableStats.add(StatsSetupConst.ROW_COUNT);
- collectableStats.add(StatsSetupConst.RAW_DATA_SIZE);
-
- nameMapping.put(StatsSetupConst.NUM_FILES, "num_files");
- nameMapping.put(StatsSetupConst.ROW_COUNT, "num_rows");
- nameMapping.put(StatsSetupConst.TOTAL_SIZE, "total_size");
- nameMapping.put(StatsSetupConst.RAW_DATA_SIZE, "raw_data_size");
- }
+ private static final List<String> collectableStats = StatsSetupConst.getStatsToBeCollected();
+ private static final List<String> supportedStats = StatsSetupConst.getSupportedStats();
public StatsTask() {
super();
@@ -94,20 +76,20 @@ public class StatsTask extends Task<Stat
/**
*
- * Partition Level Statistics.
+ * Statistics for a Partition or Unpartitioned Table
*
*/
- class PartitionStatistics {
+ class Statistics {
Map<String, LongWritable> stats;
- public PartitionStatistics() {
+ public Statistics() {
stats = new HashMap<String, LongWritable>();
for (String statType : supportedStats) {
stats.put(statType, new LongWritable(0L));
}
}
- public PartitionStatistics(Map<String, Long> st) {
+ public Statistics(Map<String, Long> st) {
stats = new HashMap<String, LongWritable>();
for (String statType : st.keySet()) {
Long stValue = st.get(statType) == null ? 0L : st.get(statType);
@@ -126,86 +108,7 @@ public class StatsTask extends Task<Stat
@Override
public String toString() {
- StringBuilder sb = new StringBuilder();
- for (String statType : supportedStats) {
- sb.append(nameMapping.get(statType)).append(": ").append(stats.get(statType)).append(", ");
- }
- sb.delete(sb.length() - 2, sb.length());
- return sb.toString();
- }
- }
-
- /**
- * Table Level Statistics.
- */
- class TableStatistics extends PartitionStatistics {
- int numPartitions; // number of partitions
-
- public TableStatistics() {
- super();
- numPartitions = 0;
- }
-
- public void setNumPartitions(int np) {
- numPartitions = np;
- }
-
- public int getNumPartitions() {
- return numPartitions;
- }
-
- /**
- * Incrementally update the table statistics according to the old and new
- * partition level statistics.
- *
- * @param oldStats
- * The old statistics of a partition.
- * @param newStats
- * The new statistics of a partition.
- */
- public void updateStats(PartitionStatistics oldStats, PartitionStatistics newStats) {
- deletePartitionStats(oldStats);
- addPartitionStats(newStats);
- }
-
- /**
- * Update the table level statistics when a new partition is added.
- *
- * @param newStats
- * the new partition statistics.
- */
- public void addPartitionStats(PartitionStatistics newStats) {
- for (String statType : supportedStats) {
- LongWritable value = stats.get(statType);
- if (value == null) {
- stats.put(statType, new LongWritable(newStats.getStat(statType)));
- } else {
- value.set(value.get() + newStats.getStat(statType));
- }
- }
- this.numPartitions++;
- }
-
- /**
- * Update the table level statistics when an old partition is dropped.
- *
- * @param oldStats
- * the old partition statistics.
- */
- public void deletePartitionStats(PartitionStatistics oldStats) {
- for (String statType : supportedStats) {
- LongWritable value = stats.get(statType);
- value.set(value.get() - oldStats.getStat(statType));
- }
- this.numPartitions--;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("num_partitions: ").append(numPartitions).append(", ");
- sb.append(super.toString());
- return sb.toString();
+ return org.apache.commons.lang.StringUtils.join(supportedStats, ", ");
}
}
@@ -297,7 +200,7 @@ public class StatsTask extends Task<Stat
}
}
- TableStatistics tblStats = new TableStatistics();
+ Statistics tblStats = new Statistics();
org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
Map<String, String> parameters = tTable.getParameters();
@@ -310,10 +213,6 @@ public class StatsTask extends Task<Stat
}
}
- if (parameters.containsKey(StatsSetupConst.NUM_PARTITIONS)) {
- tblStats.setNumPartitions(Integer.parseInt(parameters.get(StatsSetupConst.NUM_PARTITIONS)));
- }
-
List<Partition> partitions = getPartitionsList();
boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC);
int maxPrefixLength = HiveConf.getIntVar(conf,
@@ -324,10 +223,6 @@ public class StatsTask extends Task<Stat
if (!tableStatsExist && atomic) {
return 0;
}
- long[] summary = summary(conf, table);
- tblStats.setStat(StatsSetupConst.NUM_FILES, summary[0]);
- tblStats.setStat(StatsSetupConst.TOTAL_SIZE, summary[1]);
-
// In case of a non-partitioned table, the key for stats temporary store is "rootDir"
if (statsAggregator != null) {
String aggKey = Utilities.getHashedStatsPrefix(work.getAggKey(), maxPrefixLength);
@@ -344,6 +239,19 @@ public class StatsTask extends Task<Stat
}
}
}
+
+ // write table stats to metastore
+ parameters = tTable.getParameters();
+ for (String statType : collectableStats) {
+ parameters.put(statType, Long.toString(tblStats.getStat(statType)));
+ }
+ tTable.setParameters(parameters);
+
+ String tableFullName = table.getDbName() + "." + table.getTableName();
+
+ db.alterTable(tableFullName, new Table(tTable));
+
+ console.printInfo("Table " + tableFullName + " stats: [" + tblStats.toString() + ']');
} else {
// Partitioned table:
// Need to get the old stats of the partition
@@ -370,7 +278,7 @@ public class StatsTask extends Task<Stat
//
// get the new partition stats
//
- PartitionStatistics newPartStats = new PartitionStatistics();
+ Statistics newPartStats = new Statistics();
// In that case of a partition, the key for stats temporary store is
// "rootDir/[dynamic_partition_specs/]%"
@@ -398,16 +306,16 @@ public class StatsTask extends Task<Stat
}
}
- long[] summary = summary(conf, partn);
- newPartStats.setStat(StatsSetupConst.NUM_FILES, summary[0]);
- newPartStats.setStat(StatsSetupConst.TOTAL_SIZE, summary[1]);
-
- if (hasStats) {
- PartitionStatistics oldPartStats = new PartitionStatistics(currentValues);
- tblStats.updateStats(oldPartStats, newPartStats);
- } else {
- tblStats.addPartitionStats(newPartStats);
+ /**
+ * calculate fast statistics
+ */
+ FileStatus[] partfileStatus = wh.getFileStatusesForPartition(tPart);
+ newPartStats.setStat(StatsSetupConst.NUM_FILES, partfileStatus.length);
+ long partSize = 0L;
+ for (int i = 0; i < partfileStatus.length; i++) {
+ partSize += partfileStatus[i].getLen();
}
+ newPartStats.setStat(StatsSetupConst.TOTAL_SIZE, partSize);
//
// update the metastore
@@ -429,22 +337,6 @@ public class StatsTask extends Task<Stat
}
- //
- // write table stats to metastore
- //
- parameters = tTable.getParameters();
- for (String statType : supportedStats) {
- parameters.put(statType, Long.toString(tblStats.getStat(statType)));
- }
- parameters.put(StatsSetupConst.NUM_PARTITIONS, Integer.toString(tblStats.getNumPartitions()));
- tTable.setParameters(parameters);
-
- String tableFullName = table.getDbName() + "." + table.getTableName();
-
- db.alterTable(tableFullName, new Table(tTable));
-
- console.printInfo("Table " + tableFullName + " stats: [" + tblStats.toString() + ']');
-
} catch (Exception e) {
console.printInfo("[Warning] could not update stats.",
"Failed with exception " + e.getMessage() + "\n"
@@ -464,105 +356,6 @@ public class StatsTask extends Task<Stat
return ret;
}
- private long[] summary(HiveConf conf, Partition partn) throws IOException {
- Path path = partn.getPartitionPath();
- FileSystem fs = path.getFileSystem(conf);
- List<String> skewedColNames = partn.getSkewedColNames();
- if (skewedColNames == null || skewedColNames.isEmpty()) {
- return summary(fs, path);
- }
- List<List<String>> skewColValues = table.getSkewedColValues();
- if (skewColValues == null || skewColValues.isEmpty()) {
- return summary(fs, toDefaultLBPath(path));
- }
- return summary(fs, path, skewedColNames);
- }
-
- private long[] summary(HiveConf conf, Table table) throws IOException {
- Path path = table.getPath();
- FileSystem fs = path.getFileSystem(conf);
- List<String> skewedColNames = table.getSkewedColNames();
- if (skewedColNames == null || skewedColNames.isEmpty()) {
- return summary(fs, path);
- }
- List<List<String>> skewColValues = table.getSkewedColValues();
- if (skewColValues == null || skewColValues.isEmpty()) {
- return summary(fs, toDefaultLBPath(path));
- }
- return summary(fs, path, table.getSkewedColNames());
- }
-
- private Path toDefaultLBPath(Path path) {
- return new Path(path, ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME);
- }
-
- private long[] summary(FileSystem fs, Path path) throws IOException {
- try {
- FileStatus status = fs.getFileStatus(path);
- if (!status.isDir()) {
- return new long[] {1, status.getLen()};
- }
- } catch (FileNotFoundException e) {
- return new long[] {0, 0};
- }
- FileStatus[] children = fs.listStatus(path); // can be null
- if (children == null) {
- return new long[] {0, 0};
- }
- long numFiles = 0L;
- long tableSize = 0L;
- for (FileStatus child : children) {
- if (!child.isDir()) {
- tableSize += child.getLen();
- numFiles++;
- }
- }
- return new long[] {numFiles, tableSize};
- }
-
- private Pattern toPattern(List<String> skewCols) {
- StringBuilder builder = new StringBuilder();
- for (String skewCol : skewCols) {
- if (builder.length() > 0) {
- builder.append(Path.SEPARATOR_CHAR);
- }
- builder.append(skewCol).append('=');
- builder.append("[^").append(Path.SEPARATOR_CHAR).append("]*");
- }
- builder.append(Path.SEPARATOR_CHAR);
- builder.append("[^").append(Path.SEPARATOR_CHAR).append("]*$");
- return Pattern.compile(builder.toString());
- }
-
- private long[] summary(FileSystem fs, Path path, List<String> skewCols) throws IOException {
- long numFiles = 0L;
- long tableSize = 0L;
- Pattern pattern = toPattern(skewCols);
- for (FileStatus status : Utilities.getFileStatusRecurse(path, skewCols.size() + 1, fs)) {
- if (status.isDir()) {
- continue;
- }
- String relative = toRelativePath(path, status.getPath());
- if (relative == null) {
- continue;
- }
- if (relative.startsWith(ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME) ||
- pattern.matcher(relative).matches()) {
- tableSize += status.getLen();
- numFiles++;
- }
- }
- return new long[] {numFiles, tableSize};
- }
-
- private String toRelativePath(Path path1, Path path2) {
- URI relative = path1.toUri().relativize(path2.toUri());
- if (relative == path2.toUri()) {
- return null;
- }
- return relative.getPath();
- }
-
private boolean existStats(Map<String, String> parameters) {
return parameters.containsKey(StatsSetupConst.ROW_COUNT)
|| parameters.containsKey(StatsSetupConst.NUM_FILES)
@@ -571,7 +364,7 @@ public class StatsTask extends Task<Stat
|| parameters.containsKey(StatsSetupConst.NUM_PARTITIONS);
}
- private void updateStats(List<String> statsList, PartitionStatistics stats,
+ private void updateStats(List<String> statsList, Statistics stats,
StatsAggregator statsAggregator, Map<String, String> parameters,
String aggKey, boolean atomic) throws HiveException {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Tue Nov 5 07:01:32 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
@@ -37,7 +38,6 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
-import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java Tue Nov 5 07:01:32 2013
@@ -20,49 +20,65 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
import com.google.common.collect.MinMaxPriorityQueue;
-import org.apache.hadoop.hive.ql.io.HiveKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.OutputCollector;
/**
* Stores binary key/value in sorted manner to get top-n key/value
+ * TODO: rename to TopNHeap?
*/
-abstract class TopNHash {
+public class TopNHash {
+ public static Log LOG = LogFactory.getLog(TopNHash.class);
/**
* For interaction between operator and top-n hash.
* Currently only used to forward key/values stored in hash.
*/
- public static interface BinaryCollector extends OutputCollector<BytesWritable, BytesWritable> {
+ public static interface BinaryCollector {
+ public void collect(byte[] key, byte[] value, int hash) throws IOException;
}
- protected static final int FORWARD = -1;
- protected static final int EXCLUDED = -2;
- protected static final int FLUSH = -3;
- protected static final int DISABLE = -4;
+ public static final int FORWARD = -1;
+ public static final int EXCLUDED = -2;
+ private static final int FLUSH = -3;
+ private static final int DISABLE = -4;
+ private static final int MAY_FORWARD = -5;
+
+ private BinaryCollector collector;
+ private int topN;
- protected final int topN;
- protected final BinaryCollector collector;
+ private long threshold; // max heap size
+ private long usage;
- protected final long threshold; // max heap size
- protected long usage; // heap usage (not exact)
+ // binary keys, values and hashCodes of rows, lined up by index
+ private byte[][] keys;
+ private byte[][] values;
+ private int[] hashes;
+ private IndexStore indexes; // The heap over the keys, storing indexes in the array.
- // binary keys, binary values and hashcodes of keys, lined up by index
- protected final byte[][] keys;
- protected final byte[][] values;
- protected final int[] hashes;
+ private int evicted; // recently evicted index (used for next key/value)
+ private int excluded; // count of excluded rows from previous flush
- protected int evicted; // recetly evicted index (the biggest one. used for next key/value)
- protected int excluded; // count of excluded rows from previous flush
+ // temporary stuff used for vectorization
+ private int batchNumForwards = 0; // whether current batch has any forwarded keys
+ private int[] indexToBatchIndex; // mapping of index (lined up w/keys) to index in the batch
- protected final Comparator<Integer> C = new Comparator<Integer>() {
+ private boolean isEnabled = false;
+
+ private final Comparator<Integer> C = new Comparator<Integer>() {
public int compare(Integer o1, Integer o2) {
byte[] key1 = keys[o1];
byte[] key2 = keys[o2];
@@ -70,29 +86,201 @@ abstract class TopNHash {
}
};
- public static TopNHash create0() {
- return new HashForLimit0();
- }
-
- public static TopNHash create(boolean grouped, int topN, long threshold,
- BinaryCollector collector) {
+ public void initialize(
+ int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) {
+ assert topN >= 0 && memUsage > 0;
+ assert !this.isEnabled;
+ this.isEnabled = false;
+ this.topN = topN;
+ this.collector = collector;
if (topN == 0) {
- return new HashForLimit0();
+ isEnabled = true;
+ return; // topN == 0 will cause a short-circuit, don't need any initialization
}
- if (grouped) {
- return new HashForGroup(topN, threshold, collector);
- }
- return new HashForRow(topN, threshold, collector);
- }
- TopNHash(int topN, long threshold, BinaryCollector collector) {
- this.topN = topN;
- this.threshold = threshold;
- this.collector = collector;
+ // limit * 64 : compensation of arrays for key/value/hashcodes
+ this.threshold = (long) (memUsage * Runtime.getRuntime().maxMemory()) - topN * 64;
+ if (threshold < 0) {
+ return;
+ }
+ this.indexes = isMapGroupBy ? new HashForGroup() : new HashForRow();
this.keys = new byte[topN + 1][];
this.values = new byte[topN + 1][];
this.hashes = new int[topN + 1];
this.evicted = topN;
+ this.isEnabled = true;
+ }
+
+ /**
+ * Try store the non-vectorized key.
+ * @param key Serialized key.
+ * @return TopNHash.FORWARD if the row should be forwarded;
+ * TopNHash.EXCLUDED if the row should be discarded;
+ * any other number if the row is to be stored; the index should be passed to storeValue.
+ */
+ public int tryStoreKey(BytesWritable key) throws HiveException, IOException {
+ if (!isEnabled) {
+ return FORWARD; // short-circuit quickly - forward all rows
+ }
+ if (topN == 0) {
+ return EXCLUDED; // short-circuit quickly - eat all rows
+ }
+ int index = insertKeyIntoHeap(key);
+ if (index >= 0) {
+ usage += key.getLength();
+ return index;
+ }
+ // IndexStore is trying to tell us something.
+ switch (index) {
+ case DISABLE: {
+ LOG.info("Top-N hash is disabled");
+ flushInternal();
+ isEnabled = false;
+ return FORWARD;
+ }
+ case FLUSH: {
+ LOG.info("Top-N hash is flushed");
+ flushInternal();
+ // we can now retry adding key/value into hash, which is flushed.
+ // but for simplicity, just forward them
+ return FORWARD;
+ }
+ case FORWARD: return FORWARD;
+ case EXCLUDED: return EXCLUDED; // skip the row.
+ default: {
+ assert false;
+ throw new HiveException("Invalid result trying to store the key: " + index);
+ }
+ }
+ }
+
+
+ /**
+ * Perform basic checks and initialize TopNHash for the new vectorized row batch.
+ * @return TopNHash.FORWARD if all rows should be forwarded w/o trying to call TopN;
+ * TopNHash.EXCLUDED if all rows should be discarded w/o trying to call TopN;
+ * any other result means the batch has been started.
+ */
+ public int startVectorizedBatch() throws IOException, HiveException {
+ if (!isEnabled) {
+ return FORWARD; // short-circuit quickly - forward all rows
+ } else if (topN == 0) {
+ return EXCLUDED; // short-circuit quickly - eat all rows
+ }
+ // Flush here if the memory usage is too high. After that, we have the entire
+ // batch already in memory anyway so we will bypass the memory checks.
+ if (usage > threshold) {
+ int excluded = this.excluded;
+ LOG.info("Top-N hash is flushing rows");
+ flushInternal();
+ if (excluded == 0) {
+ LOG.info("Top-N hash has been disabled");
+ isEnabled = false;
+ return FORWARD; // Hash is ineffective, disable.
+ }
+ }
+ if (indexToBatchIndex == null) {
+ indexToBatchIndex = new int[topN + 1]; // for current batch, contains key index in the batch
+ }
+ Arrays.fill(indexToBatchIndex, -1);
+ batchNumForwards = 0;
+ return 0;
+ }
+
+ /**
+ * Try to put the key from the current vectorized batch into the heap.
+ * @param key the key.
+ * @param batchIndex The index of the key in the vectorized batch (sequential, not .selected).
+ * @param results The results; the number of elements equivalent to vrg.size, by kindex.
+ * The result should be the same across the calls for the batch; in then end, for each k-index:
+ * - TopNHash.EXCLUDED - discard the row.
+ * - positive index - store the row using storeValue, same as tryStoreRow.
+ * - negative index - forward the row. getVectorizedKeyToForward called w/this index will
+ * return the key to use so it doesn't have to be rebuilt.
+ */
+ public void tryStoreVectorizedKey(BytesWritable key, int batchIndex, int[] results)
+ throws HiveException, IOException {
+ // Assumption - batchIndex is increasing; startVectorizedBatch was called
+ int size = indexes.size();
+ int index = size < topN ? size : evicted;
+ keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+ Integer collisionIndex = indexes.store(index);
+ if (null != collisionIndex) {
+ // forward conditional on the survival of the corresponding key currently in indexes.
+ ++batchNumForwards;
+ results[batchIndex] = MAY_FORWARD - collisionIndex;
+ return;
+ }
+ indexToBatchIndex[index] = batchIndex;
+ results[batchIndex] = index;
+ if (size != topN) return;
+ evicted = indexes.removeBiggest(); // remove the biggest key
+ if (index == evicted) {
+ excluded++;
+ results[batchIndex] = EXCLUDED;
+ indexToBatchIndex[index] = -1;
+ return; // input key is bigger than any of keys in hash
+ }
+ removed(evicted);
+ int evictedBatchIndex = indexToBatchIndex[evicted];
+ if (evictedBatchIndex >= 0) {
+ // reset the result for the evicted index
+ results[evictedBatchIndex] = EXCLUDED;
+ indexToBatchIndex[evicted] = -1;
+ }
+ // Also evict all results grouped with this index; cannot be current key or before it.
+ if (batchNumForwards > 0) {
+ int evictedForward = (MAY_FORWARD - evicted);
+ boolean forwardRemoved = false;
+ for (int i = evictedBatchIndex + 1; i < batchIndex; ++i) {
+ if (results[i] == evictedForward) {
+ results[i] = EXCLUDED;
+ forwardRemoved = true;
+ }
+ }
+ if (forwardRemoved) {
+ --batchNumForwards;
+ }
+ }
+ }
+
+ /**
+ * After vectorized batch is processed, can return the key that caused a particular row
+ * to be forwarded. Because the row could only be marked to forward because it has
+ * the same key with some row already in the heap (for GBY), we can use that key from the
+ * heap to emit the forwarded row.
+ * @param index Negative index from the vectorized result. See tryStoreVectorizedKey.
+ * @return The key corresponding to the row.
+ */
+ public byte[] getVectorizedKeyToForward(int index) {
+ assert index <= MAY_FORWARD;
+ return keys[MAY_FORWARD - index];
+ }
+
+ /**
+ * Stores the value for the key in the heap.
+ * @param index The index, either from tryStoreKey or from tryStoreVectorizedKey result.
+ * @param value The value to store.
+ * @param keyHash The key hash to store.
+ * @param vectorized Whether the result is coming from a vectorized batch.
+ */
+ public void storeValue(int index, BytesWritable value, int keyHash, boolean vectorized) {
+ values[index] = Arrays.copyOf(value.getBytes(), value.getLength());
+ hashes[index] = keyHash;
+ // Vectorized doesn't adjust usage for the keys while processing the batch
+ usage += values[index].length + (vectorized ? keys[index].length : 0);
+ }
+
+ /**
+ * Flushes all the rows cached in the heap.
+ */
+ public void flush() throws HiveException {
+ if (!isEnabled || (topN == 0)) return;
+ try {
+ flushInternal();
+ } catch (IOException ex) {
+ throw new HiveException(ex);
+ }
}
/**
@@ -104,15 +292,14 @@ abstract class TopNHash {
* -3 for FLUSH : memory is not enough. flush values (keep keys only)
* -4 for DISABLE : hash is not effective. flush and disable it
*/
- public int indexOf(HiveKey key) {
- int size = size();
+ private int insertKeyIntoHeap(BinaryComparable key) {
if (usage > threshold) {
return excluded == 0 ? DISABLE : FLUSH;
}
+ int size = indexes.size();
int index = size < topN ? size : evicted;
keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
- hashes[index] = key.hashCode();
- if (!store(index)) {
+ if (null != indexes.store(index)) {
// it's only for GBY which should forward all values associated with the key in the range
// of limit. new value should be attatched with the key but in current implementation,
// only one values is allowed. with map-aggreagtion which is true by default,
@@ -120,7 +307,7 @@ abstract class TopNHash {
return FORWARD;
}
if (size == topN) {
- evicted = removeBiggest(); // remove the biggest key
+ evicted = indexes.removeBiggest(); // remove the biggest key
if (index == evicted) {
excluded++;
return EXCLUDED; // input key is bigger than any of keys in hash
@@ -130,130 +317,93 @@ abstract class TopNHash {
return index;
}
- protected abstract int size();
-
- protected abstract boolean store(int index);
-
- protected abstract int removeBiggest();
-
- protected abstract Iterable<Integer> indexes();
-
// key/value of the index is removed. retrieve memory usage
- public void removed(int index) {
+ private void removed(int index) {
usage -= keys[index].length;
keys[index] = null;
if (values[index] != null) {
- // value can be null if hash is flushed, which only keeps keys for limiting rows
usage -= values[index].length;
values[index] = null;
}
hashes[index] = -1;
}
- public void set(int index, BytesWritable value) {
- values[index] = Arrays.copyOf(value.getBytes(), value.getLength());
- usage += keys[index].length + values[index].length;
- }
-
- public void flush() throws IOException {
- for (int index : indexes()) {
- flush(index);
+ private void flushInternal() throws IOException, HiveException {
+ for (int index : indexes.indexes()) {
+ if (index != evicted && values[index] != null) {
+ collector.collect(keys[index], values[index], hashes[index]);
+ usage -= values[index].length;
+ values[index] = null;
+ hashes[index] = -1;
+ }
}
excluded = 0;
}
- protected void flush(int index) throws IOException {
- if (index != evicted && values[index] != null) {
- // BytesWritable copies array for set method. So just creats new one
- HiveKey keyWritable = new HiveKey(keys[index], hashes[index]);
- BytesWritable valueWritable = new BytesWritable(values[index]);
- collector.collect(keyWritable, valueWritable);
- usage -= values[index].length;
- values[index] = null;
- }
- }
-}
-
-/**
- * for order by, same keys are counted (For 1-2-2-3-4, limit 3 is 1-2-2)
- * MinMaxPriorityQueue is used because it alows duplication and fast access to biggest one
- */
-class HashForRow extends TopNHash {
-
- private final MinMaxPriorityQueue<Integer> indexes;
-
- HashForRow(int topN, long threshold, BinaryCollector collector) {
- super(topN, threshold, collector);
- this.indexes = MinMaxPriorityQueue.orderedBy(C).create();
- }
-
- protected int size() {
- return indexes.size();
- }
-
- // returns true always
- protected boolean store(int index) {
- return indexes.add(index);
+ private interface IndexStore {
+ int size();
+ /**
+ * @return the index which caused the item to be rejected; or null if accepted
+ */
+ Integer store(int index);
+ int removeBiggest();
+ Iterable<Integer> indexes();
}
- protected int removeBiggest() {
- return indexes.removeLast();
- }
-
- protected Iterable<Integer> indexes() {
- Integer[] array = indexes.toArray(new Integer[indexes.size()]);
- Arrays.sort(array, 0, array.length, C);
- return Arrays.asList(array);
- }
-}
-
-/**
- * for group by, same keys are not counted (For 1-2-2-3-4, limit 3 is 1-2-(2)-3)
- * simple TreeMap is used because group by does not need keep duplicated keys
- */
-class HashForGroup extends TopNHash {
+ /**
+ * for order by, same keys are counted (For 1-2-2-3-4, limit 3 is 1-2-2)
+ * MinMaxPriorityQueue is used because it alows duplication and fast access to biggest one
+ */
+ private class HashForRow implements IndexStore {
+ private final MinMaxPriorityQueue<Integer> indexes = MinMaxPriorityQueue.orderedBy(C).create();
- private final SortedSet<Integer> indexes;
+ public int size() {
+ return indexes.size();
+ }
- HashForGroup(int topN, long threshold, BinaryCollector collector) {
- super(topN, threshold, collector);
- this.indexes = new TreeSet<Integer>(C);
- }
+ // returns null always
+ public Integer store(int index) {
+ boolean result = indexes.add(index);
+ assert result;
+ return null;
+ }
- protected int size() {
- return indexes.size();
- }
+ public int removeBiggest() {
+ return indexes.removeLast();
+ }
- // returns false if index already exists in map
- protected boolean store(int index) {
- return indexes.add(index);
+ public Iterable<Integer> indexes() {
+ Integer[] array = indexes.toArray(new Integer[indexes.size()]);
+ Arrays.sort(array, 0, array.length, C);
+ return Arrays.asList(array);
+ }
}
- protected int removeBiggest() {
- Integer last = indexes.last();
- indexes.remove(last);
- return last;
- }
+ /**
+ * for group by, same keys are not counted (For 1-2-2-3-4, limit 3 is 1-2-(2)-3)
+ * simple TreeMap is used because group by does not need keep duplicated keys
+ */
+ private class HashForGroup implements IndexStore {
+ // TreeSet anyway uses TreeMap; so use plain TreeMap to be able to get value in collisions.
+ private final TreeMap<Integer, Integer> indexes = new TreeMap<Integer, Integer>(C);
- protected Iterable<Integer> indexes() {
- return indexes;
- }
-}
+ public int size() {
+ return indexes.size();
+ }
-class HashForLimit0 extends TopNHash {
+ // returns false if index already exists in map
+ public Integer store(int index) {
+ return indexes.put(index, index);
+ }
- HashForLimit0() {
- super(0, 0, null);
- }
+ public int removeBiggest() {
+ Integer last = indexes.lastKey();
+ indexes.remove(last);
+ return last;
+ }
- @Override
- public int indexOf(HiveKey key) {
- return EXCLUDED;
+ public Iterable<Integer> indexes() {
+ return indexes.keySet();
+ }
}
-
- protected int size() { return 0; }
- protected boolean store(int index) { return false; }
- protected int removeBiggest() { return 0; }
- protected Iterable<Integer> indexes() { return Collections.emptyList(); }
}
-
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Nov 5 07:01:32 2013
@@ -94,6 +94,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.HiveInterruptCallback;
import org.apache.hadoop.hive.common.HiveInterruptUtils;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
@@ -1641,30 +1642,6 @@ public final class Utilities {
}
}
- /**
- * Get all file status from a root path and recursively go deep into certain levels.
- *
- * @param path
- * the root path
- * @param level
- * the depth of directory should explore
- * @param fs
- * the file system
- * @return array of FileStatus
- * @throws IOException
- */
- public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs)
- throws IOException {
-
- // construct a path pattern (e.g., /*/*) to find all dynamically generated paths
- StringBuilder sb = new StringBuilder(path.toUri().getPath());
- for (int i = 0; i < level; ++i) {
- sb.append(Path.SEPARATOR).append("*");
- }
- Path pathPattern = new Path(path, sb.toString());
- return fs.globStatus(pathPattern);
- }
-
public static void mvFileToFinalPath(String specPath, Configuration hconf,
boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf,
Reporter reporter) throws IOException,
@@ -1770,7 +1747,7 @@ public final class Utilities {
ArrayList<String> result = new ArrayList<String>();
if (dpCtx != null) {
- FileStatus parts[] = getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs);
+ FileStatus parts[] = HiveStatsUtils.getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs);
HashMap<String, FileStatus> taskIDToFile = null;
for (int i = 0; i < parts.length; ++i) {
@@ -2291,7 +2268,7 @@ public final class Utilities {
Path loadPath = new Path(dpCtx.getRootPath());
FileSystem fs = loadPath.getFileSystem(conf);
int numDPCols = dpCtx.getNumDPCols();
- FileStatus[] status = Utilities.getFileStatusRecurse(loadPath, numDPCols, fs);
+ FileStatus[] status = HiveStatsUtils.getFileStatusRecurse(loadPath, numDPCols, fs);
if (status.length == 0) {
LOG.warn("No partition is generated by dynamic partitioning");
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Nov 5 07:01:32 2013
@@ -766,8 +766,7 @@ public class ExecDriver extends Task<Map
if (hadoopLocalMode && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir))) {
continue;
}
-
- tempConf.set(oneProp, deltaP.getProperty(oneProp));
+ tempConf.set(oneProp, hconf.get(oneProp));
}
// Multiple concurrent local mode job submissions can cause collisions in
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Tue Nov 5 07:01:32 2013
@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;