You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/30 17:22:48 UTC
svn commit: r1635536 [7/28] - in /hive/branches/spark: ./ accumulo-handler/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hadoo...
Modified: hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java (original)
+++ hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java Thu Oct 30 16:22:33 2014
@@ -23,13 +23,16 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
@@ -60,52 +63,70 @@ public class DummyListener extends MetaS
@Override
public void onConfigChange(ConfigChangeEvent configChange) {
- notifyList.add(configChange);
+ addEvent(configChange);
}
@Override
public void onAddPartition(AddPartitionEvent partition) throws MetaException {
- notifyList.add(partition);
+ addEvent(partition);
}
@Override
public void onCreateDatabase(CreateDatabaseEvent db) throws MetaException {
- notifyList.add(db);
+ addEvent(db);
}
@Override
public void onCreateTable(CreateTableEvent table) throws MetaException {
- notifyList.add(table);
+ addEvent(table);
}
@Override
public void onDropDatabase(DropDatabaseEvent db) throws MetaException {
- notifyList.add(db);
+ addEvent(db);
}
@Override
public void onDropPartition(DropPartitionEvent partition) throws MetaException {
- notifyList.add(partition);
+ addEvent(partition);
}
@Override
public void onDropTable(DropTableEvent table) throws MetaException {
- notifyList.add(table);
+ addEvent(table);
}
@Override
public void onAlterTable(AlterTableEvent event) throws MetaException {
- notifyList.add(event);
+ addEvent(event);
}
@Override
public void onAlterPartition(AlterPartitionEvent event) throws MetaException {
- notifyList.add(event);
+ addEvent(event);
}
@Override
public void onLoadPartitionDone(LoadPartitionDoneEvent partEvent) throws MetaException {
- notifyList.add(partEvent);
+ addEvent(partEvent);
}
+ @Override
+ public void onAddIndex(AddIndexEvent indexEvent) throws MetaException {
+ addEvent(indexEvent);
+ }
+
+ @Override
+ public void onDropIndex(DropIndexEvent indexEvent) throws MetaException {
+ addEvent(indexEvent);
+ }
+
+ @Override
+ public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException {
+ addEvent(indexEvent);
+ }
+
+ private void addEvent(ListenerEvent event) {
+ notifyList.add(event);
+ }
}
Modified: hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyPreListener.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyPreListener.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyPreListener.java (original)
+++ hive/branches/spark/metastore/src/test/org/apache/hadoop/hive/metastore/DummyPreListener.java Thu Oct 30 16:22:33 2014
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.
*
* DummyPreListener.
*
- * An implemenation of MetaStorePreEventListener which stores the Events it's seen in a list.
+ * An implementation of MetaStorePreEventListener which stores the Events it's seen in a list.
*/
public class DummyPreListener extends MetaStorePreEventListener {
Modified: hive/branches/spark/odbc/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/odbc/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/odbc/pom.xml (original)
+++ hive/branches/spark/odbc/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/packaging/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/packaging/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/packaging/pom.xml (original)
+++ hive/branches/spark/packaging/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/pom.xml (original)
+++ hive/branches/spark/pom.xml Thu Oct 30 16:22:33 2014
@@ -21,7 +21,7 @@
</parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Hive</name>
@@ -53,7 +53,7 @@
</modules>
<properties>
- <hive.version.shortname>0.14.0</hive.version.shortname>
+ <hive.version.shortname>0.15.0</hive.version.shortname>
<!-- Build Properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -133,7 +133,7 @@
<jms.version>1.1</jms.version>
<jodd.version>3.5.2</jodd.version>
<json.version>20090211</json.version>
- <junit.version>4.10</junit.version>
+ <junit.version>4.11</junit.version>
<kryo.version>2.22</kryo.version>
<libfb303.version>0.9.0</libfb303.version>
<libthrift.version>0.9.0</libthrift.version>
@@ -152,7 +152,7 @@
<stax.version>1.0.1</stax.version>
<slf4j.version>1.7.5</slf4j.version>
<ST4.version>4.0.4</ST4.version>
- <tez.version>0.5.1</tez.version>
+ <tez.version>0.5.2-SNAPSHOT</tez.version>
<super-csv.version>2.2.0</super-csv.version>
<spark.version>1.2.0-SNAPSHOT</spark.version>
<scala.binary.version>2.10</scala.binary.version>
@@ -165,6 +165,7 @@
<zookeeper.version>3.4.6</zookeeper.version>
<jpam.version>1.1</jpam.version>
<felix.version>2.4.0</felix.version>
+ <curator.version>2.6.0</curator.version>
</properties>
<repositories>
@@ -486,7 +487,13 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>${groovy.version}</version>
Modified: hive/branches/spark/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/pom.xml (original)
+++ hive/branches/spark/ql/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -28,7 +28,7 @@
<name>Hive Query Language</name>
<properties>
- <optiq.version>0.9.1-incubating-SNAPSHOT</optiq.version>
+ <calcite.version>0.9.2-incubating-SNAPSHOT</calcite.version>
<hive.path.to.root>..</hive.path.to.root>
</properties>
@@ -183,9 +183,9 @@
<version>${datanucleus-core.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.optiq</groupId>
- <artifactId>optiq-core</artifactId>
- <version>${optiq.version}</version>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <version>${calcite.version}</version>
<exclusions>
<!-- hsqldb interferes with the use of derby as the default db
in hive's use of datanucleus.
@@ -201,9 +201,9 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.optiq</groupId>
- <artifactId>optiq-avatica</artifactId>
- <version>${optiq.version}</version>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-avatica</artifactId>
+ <version>${calcite.version}</version>
<exclusions>
<!-- hsqldb interferes with the use of derby as the default db
in hive's use of datanucleus.
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Thu Oct 30 16:22:33 2014
@@ -18,8 +18,10 @@
package org.apache.hadoop.hive.ql;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.IOException;
+import java.io.PrintStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.MoveTask;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -69,7 +72,6 @@ import org.apache.hadoop.hive.ql.lockmgr
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
-import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
import org.apache.hadoop.hive.ql.metadata.DummyPartition;
@@ -409,6 +411,8 @@ public class Driver implements CommandPr
HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
hookCtx.setConf(conf);
hookCtx.setUserName(userName);
+ hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
+ hookCtx.setCommand(command);
for (HiveSemanticAnalyzerHook hook : saHooks) {
tree = hook.preAnalyze(hookCtx, tree);
}
@@ -465,6 +469,13 @@ public class Driver implements CommandPr
}
}
+ if (conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
+ String explainOutput = getExplainOutput(sem, plan, tree.dump());
+ if (explainOutput != null) {
+ LOG.info("EXPLAIN output: " + explainOutput);
+ }
+ }
+
return 0;
} catch (Exception e) {
ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
@@ -492,6 +503,33 @@ public class Driver implements CommandPr
}
/**
+ * Returns EXPLAIN EXTENDED output for a semantically
+ * analyzed query.
+ *
+ * @param sem semantic analyzer for analyzed query
+ * @param plan query plan
+ * @param astStringTree AST tree dump
+ * @throws java.io.IOException
+ */
+ private String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan plan,
+ String astStringTree) throws IOException {
+ String ret = null;
+ ExplainTask task = new ExplainTask();
+ task.initialize(conf, plan, null);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ try {
+ task.getJSONPlan(ps, astStringTree, sem.getRootTasks(), sem.getFetchTask(),
+ false, true, true);
+ ret = baos.toString();
+ } catch (Exception e) {
+ LOG.warn("Exception generating explain output: " + e, e);
+ }
+
+ return ret;
+ }
+
+ /**
* Do authorization using post semantic analysis information in the semantic analyzer
* The original command is also passed so that authorization interface can provide
* more useful information in logs.
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Thu Oct 30 16:22:33 2014
@@ -421,6 +421,8 @@ public enum ErrorMsg {
"an AcidOutputFormat or is not bucketed", true),
ACID_NO_SORTED_BUCKETS(10298, "ACID insert, update, delete not supported on tables that are " +
"sorted, table {0}", true),
+ ALTER_TABLE_TYPE_PARTIAL_PARTITION_SPEC_NO_SUPPORTED(10299,
+ "Alter table partition type {0} does not allow partial partition spec", true),
//========================== 20000 range starts here ========================//
SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java Thu Oct 30 16:22:33 2014
@@ -208,6 +208,7 @@ public class ColumnInfo implements Seria
return false;
}
+ // TODO: why does this not compare tabAlias?
ColumnInfo dest = (ColumnInfo)obj;
if ((!checkEquals(internalName, dest.getInternalName())) ||
(!checkEquals(alias, dest.getAlias())) ||
@@ -221,6 +222,18 @@ public class ColumnInfo implements Seria
return true;
}
+ public boolean isSameColumnForRR(ColumnInfo other) {
+ return checkEquals(tabAlias, other.tabAlias)
+ && checkEquals(alias, other.alias)
+ && checkEquals(internalName, other.internalName)
+ && checkEquals(getType(), other.getType());
+ }
+
+ public String toMappingString(String tab, String col) {
+ return tab + "." + col + " => {" + tabAlias + ", " + alias + ", "
+ + internalName + ": " + getType() + "}";
+ }
+
public void setObjectinspector(ObjectInspector writableObjectInspector) {
this.objectInspector = writableObjectInspector;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java Thu Oct 30 16:22:33 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -36,7 +35,6 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -64,7 +62,6 @@ public class CommonMergeJoinOperator ext
private static final long serialVersionUID = 1L;
private boolean isBigTableWork;
private static final Log LOG = LogFactory.getLog(CommonMergeJoinOperator.class.getName());
- private Map<Integer, String> aliasToInputNameMap;
transient List<Object>[] keyWritables;
transient List<Object>[] nextKeyWritables;
transient RowContainer<List<Object>>[] nextGroupStorage;
@@ -309,6 +306,8 @@ public class CommonMergeJoinOperator ext
public void closeOp(boolean abort) throws HiveException {
joinFinalLeftData();
+ super.closeOp(abort);
+
// clean up
for (int pos = 0; pos < order.length; pos++) {
if (pos != posBigTable) {
@@ -365,6 +364,9 @@ public class CommonMergeJoinOperator ext
joinOneGroup();
dataInCache = false;
for (byte pos = 0; pos < order.length; pos++) {
+ if (candidateStorage[pos] == null) {
+ continue;
+ }
if (this.candidateStorage[pos].rowCount() > 0) {
dataInCache = true;
break;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Thu Oct 30 16:22:33 2014
@@ -88,6 +88,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter;
import org.apache.hadoop.hive.ql.parse.AlterTablePartMergeFilesDesc;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.AlterIndexDesc;
@@ -153,8 +154,10 @@ import org.apache.hadoop.hive.ql.securit
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveV1Authorizer;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
@@ -183,11 +186,13 @@ import java.io.Serializable;
import java.io.Writer;
import java.net.URI;
import java.net.URISyntaxException;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -778,7 +783,7 @@ public class DDLTask extends Task<DDLWor
}
private int dropIndex(Hive db, DropIndexDesc dropIdx) throws HiveException {
- db.dropIndex(dropIdx.getTableName(), dropIdx.getIndexName(), true);
+ db.dropIndex(dropIdx.getTableName(), dropIdx.getIndexName(), dropIdx.isThrowException(), true);
return 0;
}
@@ -2201,7 +2206,7 @@ public class DDLTask extends Task<DDLWor
* Throws this exception if an unexpected error occurs.
*/
private int showTables(Hive db, ShowTablesDesc showTbls) throws HiveException {
- // get the tables for the desired pattenn - populate the output stream
+ // get the tables for the desired pattern - populate the output stream
List<String> tbls = null;
String dbName = showTbls.getDbName();
@@ -2278,7 +2283,12 @@ public class DDLTask extends Task<DDLWor
Set<String> funcs = null;
if (showFuncs.getPattern() != null) {
LOG.info("pattern: " + showFuncs.getPattern());
- funcs = FunctionRegistry.getFunctionNames(showFuncs.getPattern());
+ if (showFuncs.getIsLikePattern()) {
+ funcs = FunctionRegistry.getFunctionNamesByLikePattern(showFuncs.getPattern());
+ } else {
+ console.printInfo("SHOW FUNCTIONS is deprecated, please use SHOW FUNCTIONS LIKE instead.");
+ funcs = FunctionRegistry.getFunctionNames(showFuncs.getPattern());
+ }
LOG.info("results : " + funcs.size());
} else {
funcs = FunctionRegistry.getFunctionNames();
@@ -2802,7 +2812,7 @@ public class DDLTask extends Task<DDLWor
* is the function we are describing
* @throws HiveException
*/
- private int describeFunction(DescFunctionDesc descFunc) throws HiveException {
+ private int describeFunction(DescFunctionDesc descFunc) throws HiveException, SQLException {
String funcName = descFunc.getName();
// write the results in the file
@@ -3083,6 +3093,15 @@ public class DDLTask extends Task<DDLWor
List<FieldSchema> cols = null;
List<ColumnStatisticsObj> colStats = null;
+
+ Deserializer deserializer = tbl.getDeserializer(true);
+ if (deserializer instanceof AbstractSerDe) {
+ String errorMsgs = ((AbstractSerDe) deserializer).getConfigurationErrors();
+ if (errorMsgs != null && !errorMsgs.isEmpty()) {
+ throw new SQLException(errorMsgs);
+ }
+ }
+
if (colPath.equals(tableName)) {
cols = (part == null || tbl.getTableType() == TableType.VIRTUAL_VIEW) ?
tbl.getCols() : part.getCols();
@@ -3091,7 +3110,7 @@ public class DDLTask extends Task<DDLWor
cols.addAll(tbl.getPartCols());
}
} else {
- cols = Hive.getFieldsFromDeserializer(colPath, tbl.getDeserializer());
+ cols = Hive.getFieldsFromDeserializer(colPath, deserializer);
if (descTbl.isFormatted()) {
// when column name is specified in describe table DDL, colPath will
// will be table_name.column_name
@@ -3121,6 +3140,8 @@ public class DDLTask extends Task<DDLWor
outStream.close();
outStream = null;
+ } catch (SQLException e) {
+ throw new HiveException(e, ErrorMsg.GENERIC_ERROR, tableName);
} catch (IOException e) {
throw new HiveException(e, ErrorMsg.GENERIC_ERROR, tableName);
} finally {
@@ -3259,22 +3280,77 @@ public class DDLTask extends Task<DDLWor
// alter the table
Table tbl = db.getTable(alterTbl.getOldName());
- Partition part = null;
List<Partition> allPartitions = null;
if (alterTbl.getPartSpec() != null) {
- if (alterTbl.getOp() != AlterTableDesc.AlterTableTypes.ALTERPROTECTMODE) {
- part = db.getPartition(tbl, alterTbl.getPartSpec(), false);
+ Map<String, String> partSpec = alterTbl.getPartSpec();
+ if (DDLSemanticAnalyzer.isFullSpec(tbl, partSpec)) {
+ allPartitions = new ArrayList<Partition>();
+ Partition part = db.getPartition(tbl, partSpec, false);
if (part == null) {
+ // User provided a fully specified partition spec but it doesn't exist, fail.
throw new HiveException(ErrorMsg.INVALID_PARTITION,
- StringUtils.join(alterTbl.getPartSpec().keySet(), ',') + " for table " + alterTbl.getOldName());
+ StringUtils.join(alterTbl.getPartSpec().keySet(), ',') + " for table " + alterTbl.getOldName());
+
}
- }
- else {
+ allPartitions.add(part);
+ } else {
+ // DDLSemanticAnalyzer has already checked if partial partition specs are allowed,
+ // thus we should not need to check it here.
allPartitions = db.getPartitions(tbl, alterTbl.getPartSpec());
}
}
Table oldTbl = tbl.copy();
+ if (allPartitions != null) {
+ // Alter all partitions
+ for (Partition part : allPartitions) {
+ alterTableOrSinglePartition(alterTbl, tbl, part);
+ }
+ } else {
+ // Just alter the table
+ alterTableOrSinglePartition(alterTbl, tbl, null);
+ }
+
+ if (allPartitions == null) {
+ updateModifiedParameters(tbl.getTTable().getParameters(), conf);
+ tbl.checkValidity();
+ } else {
+ for (Partition tmpPart: allPartitions) {
+ updateModifiedParameters(tmpPart.getParameters(), conf);
+ }
+ }
+
+ try {
+ if (allPartitions == null) {
+ db.alterTable(alterTbl.getOldName(), tbl);
+ } else {
+ db.alterPartitions(tbl.getTableName(), allPartitions);
+ }
+ } catch (InvalidOperationException e) {
+ LOG.error("alter table: " + stringifyException(e));
+ throw new HiveException(e, ErrorMsg.GENERIC_ERROR);
+ }
+
+ // This is kind of hacky - the read entity contains the old table, whereas
+ // the write entity
+ // contains the new table. This is needed for rename - both the old and the
+ // new table names are
+ // passed
+ // Don't acquire locks for any of these, we have already asked for them in DDLSemanticAnalyzer.
+ if (allPartitions != null ) {
+ for (Partition tmpPart: allPartitions) {
+ work.getInputs().add(new ReadEntity(tmpPart));
+ work.getOutputs().add(new WriteEntity(tmpPart, WriteEntity.WriteType.DDL_NO_LOCK));
+ }
+ } else {
+ work.getInputs().add(new ReadEntity(oldTbl));
+ work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
+ }
+ return 0;
+ }
+
+ private int alterTableOrSinglePartition(AlterTableDesc alterTbl, Table tbl, Partition part)
+ throws HiveException {
List<FieldSchema> oldCols = (part == null ? tbl.getCols() : part.getCols());
StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
@@ -3420,12 +3496,10 @@ public class DDLTask extends Task<DDLWor
AlterTableDesc.ProtectModeType protectMode = alterTbl.getProtectModeType();
ProtectMode mode = null;
- if (allPartitions != null) {
- for (Partition tmpPart: allPartitions) {
- mode = tmpPart.getProtectMode();
- setAlterProtectMode(protectModeEnable, protectMode, mode);
- tmpPart.setProtectMode(mode);
- }
+ if (part != null) {
+ mode = part.getProtectMode();
+ setAlterProtectMode(protectModeEnable, protectMode, mode);
+ part.setProtectMode(mode);
} else {
mode = tbl.getProtectMode();
setAlterProtectMode(protectModeEnable,protectMode, mode);
@@ -3468,12 +3542,12 @@ public class DDLTask extends Task<DDLWor
throw new HiveException(e);
}
} else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDSKEWEDBY) {
- /* Validation's been done at compile time. no validation is needed here. */
+ // Validation's been done at compile time. no validation is needed here.
List<String> skewedColNames = null;
List<List<String>> skewedValues = null;
if (alterTbl.isTurnOffSkewed()) {
- /* Convert skewed table to non-skewed table. */
+ // Convert skewed table to non-skewed table.
skewedColNames = new ArrayList<String>();
skewedValues = new ArrayList<List<String>>();
} else {
@@ -3482,7 +3556,7 @@ public class DDLTask extends Task<DDLWor
}
if ( null == tbl.getSkewedInfo()) {
- /* Convert non-skewed table to skewed table. */
+ // Convert non-skewed table to skewed table.
SkewedInfo skewedInfo = new SkewedInfo();
skewedInfo.setSkewedColNames(skewedColNames);
skewedInfo.setSkewedColValues(skewedValues);
@@ -3524,59 +3598,12 @@ public class DDLTask extends Task<DDLWor
}
tbl.setNumBuckets(alterTbl.getNumberBuckets());
}
- } else {
+ } else {
throw new HiveException(ErrorMsg.UNSUPPORTED_ALTER_TBL_OP, alterTbl.getOp().toString());
}
- if (part == null && allPartitions == null) {
- updateModifiedParameters(tbl.getTTable().getParameters(), conf);
- tbl.checkValidity();
- } else if (part != null) {
- updateModifiedParameters(part.getParameters(), conf);
- }
- else {
- for (Partition tmpPart: allPartitions) {
- updateModifiedParameters(tmpPart.getParameters(), conf);
- }
- }
-
- try {
- if (part == null && allPartitions == null) {
- db.alterTable(alterTbl.getOldName(), tbl);
- } else if (part != null) {
- db.alterPartition(tbl.getTableName(), part);
- }
- else {
- db.alterPartitions(tbl.getTableName(), allPartitions);
- }
- } catch (InvalidOperationException e) {
- LOG.info("alter table: " + stringifyException(e));
- throw new HiveException(e, ErrorMsg.GENERIC_ERROR);
- }
-
- // This is kind of hacky - the read entity contains the old table, whereas
- // the write entity
- // contains the new table. This is needed for rename - both the old and the
- // new table names are
- // passed
- // Don't acquire locks for any of these, we have already asked for them in DDLSemanticAnalyzer.
- if(part != null) {
- work.getInputs().add(new ReadEntity(part));
- work.getOutputs().add(new WriteEntity(part, WriteEntity.WriteType.DDL_NO_LOCK));
- }
- else if (allPartitions != null ){
- for (Partition tmpPart: allPartitions) {
- work.getInputs().add(new ReadEntity(tmpPart));
- work.getOutputs().add(new WriteEntity(tmpPart, WriteEntity.WriteType.DDL_NO_LOCK));
- }
- }
- else {
- work.getInputs().add(new ReadEntity(oldTbl));
- work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
- }
return 0;
}
-
/**
* Drop a given table or some partitions. DropTableDesc is currently used for both.
*
@@ -3896,7 +3923,7 @@ public class DDLTask extends Task<DDLWor
tbl.setInputFormatClass(crtTbl.getInputFormat());
tbl.setOutputFormatClass(crtTbl.getOutputFormat());
- // only persist input/ouput format to metadata when it is explicitly specified.
+ // only persist input/output format to metadata when it is explicitly specified.
// Otherwise, load lazily via StorageHandler at query time.
if (crtTbl.getInputFormat() != null && !crtTbl.getInputFormat().isEmpty()) {
tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName());
@@ -3965,7 +3992,7 @@ public class DDLTask extends Task<DDLWor
* @throws HiveException
* Throws this exception if an unexpected error occurs.
*/
- private int createTableLike(Hive db, CreateTableLikeDesc crtTbl) throws HiveException {
+ private int createTableLike(Hive db, CreateTableLikeDesc crtTbl) throws Exception {
// Get the existing table
Table oldtbl = db.getTable(crtTbl.getLikeTableName());
Table tbl;
@@ -4031,12 +4058,22 @@ public class DDLTask extends Task<DDLWor
tbl.unsetDataLocation();
}
+ Class<? extends Deserializer> serdeClass = oldtbl.getDeserializerClass();
+
Map<String, String> params = tbl.getParameters();
// We should copy only those table parameters that are specified in the config.
+ SerDeSpec spec = AnnotationUtils.getAnnotation(serdeClass, SerDeSpec.class);
String paramsStr = HiveConf.getVar(conf, HiveConf.ConfVars.DDL_CTL_PARAMETERS_WHITELIST);
+
+ Set<String> retainer = new HashSet<String>();
+ if (spec != null && spec.schemaProps() != null) {
+ retainer.addAll(Arrays.asList(spec.schemaProps()));
+ }
if (paramsStr != null) {
- List<String> paramsList = Arrays.asList(paramsStr.split(","));
- params.keySet().retainAll(paramsList);
+ retainer.addAll(Arrays.asList(paramsStr.split(",")));
+ }
+ if (!retainer.isEmpty()) {
+ params.keySet().retainAll(retainer);
} else {
params.clear();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Thu Oct 30 16:22:33 2014
@@ -30,7 +30,7 @@ import java.util.Map;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -66,6 +66,7 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
@@ -219,27 +220,32 @@ public class FetchOperator implements Se
/**
* A cache of InputFormat instances.
*/
- private static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats = new HashMap<Class, InputFormat<WritableComparable, Writable>>();
+ private static final Map<String, InputFormat> inputFormats = new HashMap<String, InputFormat>();
@SuppressWarnings("unchecked")
- static InputFormat<WritableComparable, Writable> getInputFormatFromCache(Class inputFormatClass,
- Configuration conf) throws IOException {
- if (!inputFormats.containsKey(inputFormatClass)) {
+ static InputFormat getInputFormatFromCache(Class<? extends InputFormat> inputFormatClass,
+ JobConf conf) throws IOException {
+ if (Configurable.class.isAssignableFrom(inputFormatClass) ||
+ JobConfigurable.class.isAssignableFrom(inputFormatClass)) {
+ return (InputFormat<WritableComparable, Writable>) ReflectionUtils
+ .newInstance(inputFormatClass, conf);
+ }
+ InputFormat format = inputFormats.get(inputFormatClass.getName());
+ if (format == null) {
try {
- InputFormat<WritableComparable, Writable> newInstance = (InputFormat<WritableComparable, Writable>) ReflectionUtils
- .newInstance(inputFormatClass, conf);
- inputFormats.put(inputFormatClass, newInstance);
+ format = ReflectionUtils.newInstance(inputFormatClass, conf);
+ inputFormats.put(inputFormatClass.getName(), format);
} catch (Exception e) {
throw new IOException("Cannot create an instance of InputFormat class "
+ inputFormatClass.getName() + " as specified in mapredWork!", e);
}
}
- return inputFormats.get(inputFormatClass);
+ return format;
}
private StructObjectInspector getRowInspectorFromTable(TableDesc table) throws Exception {
Deserializer serde = table.getDeserializerClass().newInstance();
- SerDeUtils.initializeSerDe(serde, job, table.getProperties(), null);
+ SerDeUtils.initializeSerDeWithoutErrorCheck(serde, job, table.getProperties(), null);
return createRowInspector(getStructOIFrom(serde.getObjectInspector()));
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Oct 30 16:22:33 2014
@@ -109,6 +109,15 @@ public class FileSinkOperator extends Te
private StructField bucketField; // field bucket is in in record id
private StructObjectInspector recIdInspector; // OI for inspecting record id
private IntObjectInspector bucketInspector; // OI for inspecting bucket id
+ protected transient long numRows = 0;
+ protected transient long cntr = 1;
+
+ /**
+ * Counters.
+ */
+ public static enum Counter {
+ RECORDS_OUT
+ }
/**
* RecordWriter.
@@ -249,7 +258,7 @@ public class FileSinkOperator extends Te
private static final long serialVersionUID = 1L;
protected transient FileSystem fs;
protected transient Serializer serializer;
- protected transient LongWritable row_count;
+ protected final transient LongWritable row_count = new LongWritable();
private transient boolean isNativeTable = true;
/**
@@ -352,7 +361,7 @@ public class FileSinkOperator extends Te
prtner = (HivePartitioner<HiveKey, Object>) ReflectionUtils.newInstance(
jc.getPartitionerClass(), null);
}
- row_count = new LongWritable();
+
if (dpCtx != null) {
dpSetup();
}
@@ -381,6 +390,15 @@ public class FileSinkOperator extends Te
bucketField = recIdInspector.getAllStructFieldRefs().get(1);
bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
}
+
+ numRows = 0;
+
+ String context = jc.get(Operator.CONTEXT_NAME_KEY, "");
+ if (context != null && !context.isEmpty()) {
+ context = "_" + context.replace(" ","_");
+ }
+ statsMap.put(Counter.RECORDS_OUT + context, row_count);
+
initializeChildren(hconf);
} catch (HiveException e) {
throw e;
@@ -657,9 +675,9 @@ public class FileSinkOperator extends Te
fpaths.stat.addToStat(StatsSetupConst.ROW_COUNT, 1);
}
-
- if (row_count != null) {
- row_count.set(row_count.get() + 1);
+ if (++numRows == cntr) {
+ cntr *= 10;
+ LOG.info(toString() + ": records written - " + numRows);
}
int writerOffset = findWriterOffset(row);
@@ -921,6 +939,9 @@ public class FileSinkOperator extends Te
@Override
public void closeOp(boolean abort) throws HiveException {
+ row_count.set(numRows);
+ LOG.info(toString() + ": records written - " + numRows);
+
if (!bDynParts && !filesCreated) {
createBucketFiles(fsp);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Thu Oct 30 16:22:33 2014
@@ -37,29 +37,14 @@ public class FilterOperator extends Oper
Serializable {
private static final long serialVersionUID = 1L;
-
- /**
- * Counter.
- *
- */
- public static enum Counter {
- FILTERED, PASSED
- }
-
- protected final transient LongWritable filtered_count;
- protected final transient LongWritable passed_count;
private transient ExprNodeEvaluator conditionEvaluator;
private transient PrimitiveObjectInspector conditionInspector;
- private transient int consecutiveFails;
private transient int consecutiveSearches;
private transient IOContext ioContext;
protected transient int heartbeatInterval;
public FilterOperator() {
super();
- filtered_count = new LongWritable();
- passed_count = new LongWritable();
- consecutiveFails = 0;
consecutiveSearches = 0;
}
@@ -73,8 +58,6 @@ public class FilterOperator extends Oper
conditionEvaluator = ExprNodeEvaluatorFactory.toCachedEval(conditionEvaluator);
}
- statsMap.put(Counter.FILTERED, filtered_count);
- statsMap.put(Counter.PASSED, passed_count);
conditionInspector = null;
ioContext = IOContext.get(hconf);
} catch (Throwable e) {
@@ -135,17 +118,6 @@ public class FilterOperator extends Oper
.getPrimitiveJavaObject(condition);
if (Boolean.TRUE.equals(ret)) {
forward(row, rowInspector);
- passed_count.set(passed_count.get() + 1);
- consecutiveFails = 0;
- } else {
- filtered_count.set(filtered_count.get() + 1);
- consecutiveFails++;
-
- // In case of a lot of consecutive failures, send a heartbeat in order to
- // avoid timeout
- if (((consecutiveFails % heartbeatInterval) == 0) && (reporter != null)) {
- reporter.progress();
- }
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Thu Oct 30 16:22:33 2014
@@ -500,7 +500,7 @@ public final class FunctionRegistry {
Class<? extends GenericUDF> genericUDFClass) {
if (GenericUDF.class.isAssignableFrom(genericUDFClass)) {
FunctionInfo fI = new FunctionInfo(isNative, functionName,
- (GenericUDF) ReflectionUtils.newInstance(genericUDFClass, null));
+ ReflectionUtils.newInstance(genericUDFClass, null));
mFunctions.put(functionName.toLowerCase(), fI);
registerNativeStatus(fI);
} else {
@@ -523,7 +523,7 @@ public final class FunctionRegistry {
Class<? extends GenericUDTF> genericUDTFClass) {
if (GenericUDTF.class.isAssignableFrom(genericUDTFClass)) {
FunctionInfo fI = new FunctionInfo(isNative, functionName,
- (GenericUDTF) ReflectionUtils.newInstance(genericUDTFClass, null));
+ ReflectionUtils.newInstance(genericUDTFClass, null));
mFunctions.put(functionName.toLowerCase(), fI);
registerNativeStatus(fI);
} else {
@@ -534,7 +534,7 @@ public final class FunctionRegistry {
private static FunctionInfo getFunctionInfoFromMetastore(String functionName) {
FunctionInfo ret = null;
-
+
try {
String dbName;
String fName;
@@ -577,7 +577,7 @@ public final class FunctionRegistry {
// Lookup of UDf class failed
LOG.error("Unable to load UDF class: " + e);
}
-
+
return ret;
}
@@ -599,7 +599,7 @@ public final class FunctionRegistry {
if (functionInfo != null) {
loadFunctionResourcesIfNecessary(functionName, functionInfo);
}
-
+
return functionInfo;
}
@@ -732,6 +732,34 @@ public final class FunctionRegistry {
}
/**
+ * Returns a set of registered function names which matchs the given pattern.
+ * This is used for the CLI command "SHOW FUNCTIONS LIKE 'regular expression';"
+ *
+ * @param funcPatternStr
+ * regular expression of the interested function names
+ * @return set of strings contains function names
+ */
+ public static Set<String> getFunctionNamesByLikePattern(String funcPatternStr) {
+ Set<String> funcNames = new TreeSet<String>();
+ Set<String> allFuncs = getFunctionNames(true);
+ String[] subpatterns = funcPatternStr.trim().split("\\|");
+ for (String subpattern : subpatterns) {
+ subpattern = "(?i)" + subpattern.replaceAll("\\*", ".*");
+ try {
+ Pattern patternObj = Pattern.compile(subpattern);
+ for (String funcName : allFuncs) {
+ if (patternObj.matcher(funcName).matches()) {
+ funcNames.add(funcName);
+ }
+ }
+ } catch (PatternSyntaxException e) {
+ continue;
+ }
+ }
+ return funcNames;
+ }
+
+ /**
* Returns the set of synonyms of the supplied function.
*
* @param funcName
@@ -954,6 +982,12 @@ public final class FunctionRegistry {
(PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b,PrimitiveCategory.STRING);
}
+ // Another special case, because timestamp is not implicitly convertible to numeric types.
+ if ((pgA == PrimitiveGrouping.NUMERIC_GROUP || pgB == PrimitiveGrouping.NUMERIC_GROUP)
+ && (pcA == PrimitiveCategory.TIMESTAMP || pcB == PrimitiveCategory.TIMESTAMP)) {
+ return TypeInfoFactory.doubleTypeInfo;
+ }
+
for (PrimitiveCategory t : numericTypeList) {
if (FunctionRegistry.implicitConvertible(pcA, t)
&& FunctionRegistry.implicitConvertible(pcB, t)) {
@@ -984,7 +1018,7 @@ public final class FunctionRegistry {
// If either is not a numeric type, return null.
return null;
}
-
+
return (ai > bi) ? pcA : pcB;
}
@@ -1189,7 +1223,7 @@ public final class FunctionRegistry {
Class<? extends UDAF> udafClass) {
FunctionInfo fi = new FunctionInfo(isNative,
functionName.toLowerCase(), new GenericUDAFBridge(
- (UDAF) ReflectionUtils.newInstance(udafClass, null)));
+ ReflectionUtils.newInstance(udafClass, null)));
mFunctions.put(functionName.toLowerCase(), fi);
// All aggregate functions should also be usable as window functions
@@ -1537,7 +1571,7 @@ public final class FunctionRegistry {
clonedUDF = new GenericUDFMacro(bridge.getMacroName(), bridge.getBody(),
bridge.getColNames(), bridge.getColTypes());
} else {
- clonedUDF = (GenericUDF) ReflectionUtils
+ clonedUDF = ReflectionUtils
.newInstance(genericUDF.getClass(), null);
}
@@ -1576,7 +1610,7 @@ public final class FunctionRegistry {
if (null == genericUDTF) {
return null;
}
- return (GenericUDTF) ReflectionUtils.newInstance(genericUDTF.getClass(),
+ return ReflectionUtils.newInstance(genericUDTF.getClass(),
null);
}
@@ -1701,17 +1735,17 @@ public final class FunctionRegistry {
/**
* Returns whether the exprNodeDesc is node of "cast".
*/
- private static boolean isOpCast(ExprNodeDesc desc) {
+ public static boolean isOpCast(ExprNodeDesc desc) {
if (!(desc instanceof ExprNodeGenericFuncDesc)) {
return false;
}
- GenericUDF genericUDF = ((ExprNodeGenericFuncDesc)desc).getGenericUDF();
- Class udfClass;
- if (genericUDF instanceof GenericUDFBridge) {
- udfClass = ((GenericUDFBridge)genericUDF).getUdfClass();
- } else {
- udfClass = genericUDF.getClass();
- }
+ return isOpCast(((ExprNodeGenericFuncDesc)desc).getGenericUDF());
+ }
+
+ public static boolean isOpCast(GenericUDF genericUDF) {
+ Class udfClass = (genericUDF instanceof GenericUDFBridge) ?
+ ((GenericUDFBridge)genericUDF).getUdfClass() : genericUDF.getClass();
+
return udfClass == UDFToBoolean.class || udfClass == UDFToByte.class ||
udfClass == UDFToDouble.class || udfClass == UDFToFloat.class ||
udfClass == UDFToInteger.class || udfClass == UDFToLong.class ||
@@ -1934,7 +1968,7 @@ public final class FunctionRegistry {
{
return getTableFunctionResolver(WINDOWING_TABLE_FUNCTION);
}
-
+
public static boolean isNoopFunction(String fnName) {
fnName = fnName.toLowerCase();
return fnName.equals(NOOP_MAP_TABLE_FUNCTION) ||
@@ -1958,17 +1992,18 @@ public final class FunctionRegistry {
* @return true if function is a UDAF, has WindowFunctionDescription annotation and the annotations
* confirms a ranking function, false otherwise
*/
- public static boolean isRankingFunction(String name){
+ public static boolean isRankingFunction(String name) {
FunctionInfo info = getFunctionInfo(name);
+ if (info == null) {
+ return false;
+ }
GenericUDAFResolver res = info.getGenericUDAFResolver();
- if (res != null){
- WindowFunctionDescription desc =
- AnnotationUtils.getAnnotation(res.getClass(), WindowFunctionDescription.class);
- if (desc != null){
- return desc.rankingFunction();
- }
+ if (res == null) {
+ return false;
}
- return false;
+ WindowFunctionDescription desc =
+ AnnotationUtils.getAnnotation(res.getClass(), WindowFunctionDescription.class);
+ return (desc != null) && desc.rankingFunction();
}
/**
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Thu Oct 30 16:22:33 2014
@@ -63,7 +63,7 @@ public class JoinOperator extends Common
skewJoinKeyContext.initiliaze(hconf);
skewJoinKeyContext.setSkewJoinJobCounter(skewjoin_followup_jobs);
}
- statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS, skewjoin_followup_jobs);
+ statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS.toString(), skewjoin_followup_jobs);
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Thu Oct 30 16:22:33 2014
@@ -171,9 +171,14 @@ public class MapJoinOperator extends Abs
private void loadHashTable() throws HiveException {
- if ((this.getExecContext() != null)
- && ((this.getExecContext().getLocalWork() == null) || (!this.getExecContext()
- .getLocalWork().getInputFileChangeSensitive()))) {
+ if ((this.getExecContext() == null)
+ || (this.getExecContext().getLocalWork() == null)
+ || (this.getExecContext().getLocalWork().getInputFileChangeSensitive() == false)
+ ) {
+ /*
+ * This early-exit criteria is not applicable if the local work is sensitive to input file changes.
+ * But the check does no apply if there is no local work, or if this is a reducer vertex (execContext is null).
+ */
if (hashTblInitedOnce) {
return;
} else {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Thu Oct 30 16:22:33 2014
@@ -23,20 +23,20 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
/**
@@ -66,6 +67,7 @@ import org.apache.hadoop.util.StringUtil
* different from regular operators in that it starts off by processing a
* Writable data structure from a Table (instead of a Hive Object).
**/
+@SuppressWarnings("deprecation")
public class MapOperator extends Operator<MapWork> implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
@@ -75,60 +77,32 @@ public class MapOperator extends Operato
*
*/
public static enum Counter {
- DESERIALIZE_ERRORS
+ DESERIALIZE_ERRORS,
+ RECORDS_IN
}
private final transient LongWritable deserialize_error_count = new LongWritable();
-
- private final Map<MapInputPath, MapOpCtx> opCtxMap = new HashMap<MapInputPath, MapOpCtx>();
- private final Map<Operator<? extends OperatorDesc>, MapOpCtx> childrenOpToOpCtxMap =
- new HashMap<Operator<? extends OperatorDesc>, MapOpCtx>();
-
- protected transient MapOpCtx current;
- private transient List<Operator<? extends OperatorDesc>> extraChildrenToClose = null;
- private final Map<String, Path> normalizedPaths = new HashMap<String, Path>();
-
- private static class MapInputPath {
- String path;
- String alias;
- Operator<?> op;
- PartitionDesc partDesc;
-
- /**
- * @param path
- * @param alias
- * @param op
- */
- public MapInputPath(String path, String alias, Operator<?> op, PartitionDesc partDesc) {
- this.path = path;
- this.alias = alias;
- this.op = op;
- this.partDesc = partDesc;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof MapInputPath) {
- MapInputPath mObj = (MapInputPath) o;
- return path.equals(mObj.path) && alias.equals(mObj.alias)
- && op.equals(mObj.op);
- }
-
- return false;
- }
-
- @Override
- public int hashCode() {
- int ret = (path == null) ? 0 : path.hashCode();
- ret += (alias == null) ? 0 : alias.hashCode();
- ret += (op == null) ? 0 : op.hashCode();
- return ret;
- }
- }
+ private final transient LongWritable recordCounter = new LongWritable();
+ protected transient long numRows = 0;
+ protected transient long cntr = 1;
+
+ // input path --> {operator --> context}
+ private final Map<String, Map<Operator<?>, MapOpCtx>> opCtxMap =
+ new HashMap<String, Map<Operator<?>, MapOpCtx>>();
+ // child operator --> object inspector (converted OI if it's needed)
+ private final Map<Operator<?>, StructObjectInspector> childrenOpToOI =
+ new HashMap<Operator<?>, StructObjectInspector>();
+
+ // context for current input file
+ protected transient MapOpCtx[] currentCtxs;
+ private transient final Map<String, Path> normalizedPaths = new HashMap<String, Path>();
protected static class MapOpCtx {
- StructObjectInspector tblRawRowObjectInspector; // columns
+ final String alias;
+ final Operator<?> op;
+ final PartitionDesc partDesc;
+
StructObjectInspector partObjectInspector; // partition columns
StructObjectInspector vcsObjectInspector; // virtual columns
StructObjectInspector rowObjectInspector;
@@ -144,6 +118,12 @@ public class MapOperator extends Operato
List<VirtualColumn> vcs;
Object[] vcValues;
+ public MapOpCtx(String alias, Operator<?> op, PartitionDesc partDesc) {
+ this.alias = alias;
+ this.op = op;
+ this.partDesc = partDesc;
+ }
+
private boolean isPartitioned() {
return partObjectInspector != null;
}
@@ -152,12 +132,30 @@ public class MapOperator extends Operato
return vcsObjectInspector != null;
}
- private Object readRow(Writable value) throws SerDeException {
- return partTblObjectInspectorConverter.convert(deserializer.deserialize(value));
+ private Object readRow(Writable value, ExecMapperContext context) throws SerDeException {
+ Object deserialized = deserializer.deserialize(value);
+ Object row = partTblObjectInspectorConverter.convert(deserialized);
+ if (hasVC()) {
+ rowWithPartAndVC[0] = row;
+ if (context != null) {
+ populateVirtualColumnValues(context, vcs, vcValues, deserializer);
+ }
+ int vcPos = isPartitioned() ? 2 : 1;
+ rowWithPartAndVC[vcPos] = vcValues;
+ return rowWithPartAndVC;
+ } else if (isPartitioned()) {
+ rowWithPart[0] = row;
+ return rowWithPart;
+ }
+ return row;
}
- public StructObjectInspector getRowObjectInspector() {
- return rowObjectInspector;
+ public boolean forward(Object row) throws HiveException {
+ if (op.getDone()) {
+ return false;
+ }
+ op.processOp(row, 0);
+ return true;
}
}
@@ -170,20 +168,19 @@ public class MapOperator extends Operato
* @param mapWork
* @throws HiveException
*/
- public void initializeAsRoot(Configuration hconf, MapWork mapWork)
- throws HiveException {
+ @VisibleForTesting
+ void initializeAsRoot(JobConf hconf, MapWork mapWork) throws Exception {
setConf(mapWork);
setChildren(hconf);
+ setExecContext(new ExecMapperContext(hconf));
initialize(hconf, null);
}
- private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx,
- Map<TableDesc, StructObjectInspector> convertedOI) throws Exception {
-
- PartitionDesc pd = ctx.partDesc;
+ private MapOpCtx initObjectInspector(Configuration hconf, MapOpCtx opCtx,
+ StructObjectInspector tableRowOI) throws Exception {
+ PartitionDesc pd = opCtx.partDesc;
TableDesc td = pd.getTableDesc();
- MapOpCtx opCtx = new MapOpCtx();
// Use table properties in case of unpartitioned tables,
// and the union of table properties and partition properties, with partition
// taking precedence, in the case of partitioned tables
@@ -194,18 +191,13 @@ public class MapOperator extends Operato
opCtx.tableName = String.valueOf(overlayedProps.getProperty("name"));
opCtx.partName = String.valueOf(partSpec);
-
- Class serdeclass = hconf.getClassByName(pd.getSerdeClassName());
- opCtx.deserializer = (Deserializer) serdeclass.newInstance();
- SerDeUtils.initializeSerDe(opCtx.deserializer, hconf, td.getProperties(), pd.getProperties());
+ opCtx.deserializer = pd.getDeserializer(hconf);
StructObjectInspector partRawRowObjectInspector =
(StructObjectInspector) opCtx.deserializer.getObjectInspector();
- opCtx.tblRawRowObjectInspector = convertedOI.get(td);
-
- opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
- partRawRowObjectInspector, opCtx.tblRawRowObjectInspector);
+ opCtx.partTblObjectInspectorConverter =
+ ObjectInspectorConverters.getConverter(partRawRowObjectInspector, tableRowOI);
// Next check if this table has partitions and if so
// get the list of partition names as well as allocate
@@ -253,8 +245,8 @@ public class MapOperator extends Operato
// The op may not be a TableScan for mapjoins
// Consider the query: select /*+MAPJOIN(a)*/ count(*) FROM T1 a JOIN T2 b ON a.key = b.key;
// In that case, it will be a Select, but the rowOI need not be amended
- if (ctx.op instanceof TableScanOperator) {
- TableScanOperator tsOp = (TableScanOperator) ctx.op;
+ if (opCtx.op instanceof TableScanOperator) {
+ TableScanOperator tsOp = (TableScanOperator) opCtx.op;
TableScanDesc tsDesc = tsOp.getConf();
if (tsDesc != null && tsDesc.hasVirtualCols()) {
opCtx.vcs = tsDesc.getVirtualCols();
@@ -268,11 +260,11 @@ public class MapOperator extends Operato
}
}
if (!opCtx.hasVC() && !opCtx.isPartitioned()) {
- opCtx.rowObjectInspector = opCtx.tblRawRowObjectInspector;
+ opCtx.rowObjectInspector = tableRowOI;
return opCtx;
}
List<StructObjectInspector> inspectors = new ArrayList<StructObjectInspector>();
- inspectors.add(opCtx.tblRawRowObjectInspector);
+ inspectors.add(tableRowOI);
if (opCtx.isPartitioned()) {
inspectors.add(opCtx.partObjectInspector);
}
@@ -302,19 +294,14 @@ public class MapOperator extends Operato
for (String onefile : conf.getPathToAliases().keySet()) {
PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
TableDesc tableDesc = pd.getTableDesc();
- Properties tblProps = tableDesc.getProperties();
- Class sdclass = hconf.getClassByName(pd.getSerdeClassName());
- Deserializer partDeserializer = (Deserializer) sdclass.newInstance();
- SerDeUtils.initializeSerDe(partDeserializer, hconf, tblProps, pd.getProperties());
- StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer
- .getObjectInspector();
+ Deserializer partDeserializer = pd.getDeserializer(hconf);
+ StructObjectInspector partRawRowObjectInspector =
+ (StructObjectInspector) partDeserializer.getObjectInspector();
StructObjectInspector tblRawRowObjectInspector = tableDescOI.get(tableDesc);
if ((tblRawRowObjectInspector == null) ||
(identityConverterTableDesc.contains(tableDesc))) {
- sdclass = hconf.getClassByName(tableDesc.getSerdeClassName());
- Deserializer tblDeserializer = (Deserializer) sdclass.newInstance();
- SerDeUtils.initializeSerDe(tblDeserializer, hconf, tblProps, null);
+ Deserializer tblDeserializer = tableDesc.getDeserializer(hconf);
tblRawRowObjectInspector =
(StructObjectInspector) ObjectInspectorConverters.getConvertedOI(
partRawRowObjectInspector,
@@ -338,155 +325,153 @@ public class MapOperator extends Operato
return tableDescOI;
}
- public void setChildren(Configuration hconf) throws HiveException {
- Path fpath = IOContext.get(hconf).getInputPath();
-
- boolean schemeless = fpath.toUri().getScheme() == null;
+ public void setChildren(Configuration hconf) throws Exception {
List<Operator<? extends OperatorDesc>> children =
new ArrayList<Operator<? extends OperatorDesc>>();
Map<TableDesc, StructObjectInspector> convertedOI = getConvertedOI(hconf);
- try {
- for (Map.Entry<String, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) {
- String onefile = entry.getKey();
- List<String> aliases = entry.getValue();
-
- Path onepath = new Path(onefile);
- if (schemeless) {
- onepath = new Path(onepath.toUri().getPath());
- }
-
- PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile);
-
- for (String onealias : aliases) {
- Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(onealias);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding alias " + onealias + " to work list for file "
- + onefile);
- }
- MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc);
- if (opCtxMap.containsKey(inp)) {
- continue;
- }
- MapOpCtx opCtx = initObjectInspector(hconf, inp, convertedOI);
- opCtxMap.put(inp, opCtx);
+ for (Map.Entry<String, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) {
+ String onefile = entry.getKey();
+ List<String> aliases = entry.getValue();
+ PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile);
+
+ for (String alias : aliases) {
+ Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(alias);
+ if (isLogDebugEnabled) {
+ LOG.debug("Adding alias " + alias + " to work list for file "
+ + onefile);
+ }
+ Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(onefile);
+ if (contexts == null) {
+ opCtxMap.put(onefile, contexts = new LinkedHashMap<Operator<?>, MapOpCtx>());
+ }
+ if (contexts.containsKey(op)) {
+ continue;
+ }
+ MapOpCtx context = new MapOpCtx(alias, op, partDesc);
+ StructObjectInspector tableRowOI = convertedOI.get(partDesc.getTableDesc());
+ contexts.put(op, initObjectInspector(hconf, context, tableRowOI));
- op.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+ if (children.contains(op) == false) {
+ op.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>(1));
op.getParentOperators().add(this);
- // check for the operators who will process rows coming to this Map
- // Operator
- if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
- children.add(op);
- childrenOpToOpCtxMap.put(op, opCtx);
- LOG.info("dump " + op + " "
- + opCtxMap.get(inp).rowObjectInspector.getTypeName());
- }
- current = opCtx; // just need for TestOperators.testMapOperator
+ children.add(op);
}
}
+ }
+
+ initOperatorContext(children);
- if (children.size() == 0) {
- // didn't find match for input file path in configuration!
- // serious problem ..
- LOG.error("Configuration does not have any alias for path: "
- + fpath.toUri());
- throw new HiveException("Configuration and input path are inconsistent");
+ // we found all the operators that we are supposed to process.
+ setChildOperators(children);
+ }
+
+ private void initOperatorContext(List<Operator<? extends OperatorDesc>> children)
+ throws HiveException {
+ for (Map<Operator<?>, MapOpCtx> contexts : opCtxMap.values()) {
+ for (MapOpCtx context : contexts.values()) {
+ if (!children.contains(context.op)) {
+ continue;
+ }
+ StructObjectInspector prev =
+ childrenOpToOI.put(context.op, context.rowObjectInspector);
+ if (prev != null && !prev.equals(context.rowObjectInspector)) {
+ throw new HiveException("Conflict on row inspector for " + context.alias);
+ }
+ if (isLogDebugEnabled) {
+ LOG.debug("dump " + context.op + " " + context.rowObjectInspector.getTypeName());
+ }
}
+ }
+ }
- // we found all the operators that we are supposed to process.
- setChildOperators(children);
- } catch (Exception e) {
- throw new HiveException(e);
+ private String getNominalPath(Path fpath) {
+ String nominal = null;
+ boolean schemaless = fpath.toUri().getScheme() == null;
+ for (String onefile : conf.getPathToAliases().keySet()) {
+ Path onepath = normalizePath(onefile, schemaless);
+ // check for the operators who will process rows coming to this Map Operator
+ if (onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
+ // not from this
+ continue;
+ }
+ if (nominal != null) {
+ throw new IllegalStateException("Ambiguous input path " + fpath);
+ }
+ nominal = onefile;
+ }
+ if (nominal == null) {
+ throw new IllegalStateException("Invalid input path " + fpath);
}
+ return nominal;
}
@Override
public void initializeOp(Configuration hconf) throws HiveException {
// set that parent initialization is done and call initialize on children
state = State.INIT;
- statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
+ statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
- List<Operator<? extends OperatorDesc>> children = getChildOperators();
+ numRows = 0;
- for (Entry<Operator<? extends OperatorDesc>, MapOpCtx> entry : childrenOpToOpCtxMap
- .entrySet()) {
- Operator<? extends OperatorDesc> child = entry.getKey();
- MapOpCtx mapOpCtx = entry.getValue();
- // Add alias, table name, and partitions to hadoop conf so that their
- // children will inherit these
- HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, mapOpCtx.tableName);
- HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, mapOpCtx.partName);
- child.initialize(hconf, new ObjectInspector[] {mapOpCtx.rowObjectInspector});
- }
-
- for (Entry<MapInputPath, MapOpCtx> entry : opCtxMap.entrySet()) {
- MapInputPath input = entry.getKey();
- MapOpCtx mapOpCtx = entry.getValue();
- // Add alias, table name, and partitions to hadoop conf so that their
- // children will inherit these
- HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, mapOpCtx.tableName);
- HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, mapOpCtx.partName);
-
- Operator<? extends OperatorDesc> op = input.op;
- if (children.indexOf(op) == -1) {
- // op is not in the children list, so need to remember it and close it afterwards
- if (extraChildrenToClose == null) {
- extraChildrenToClose = new ArrayList<Operator<? extends OperatorDesc>>();
- }
- extraChildrenToClose.add(op);
- op.initialize(hconf, new ObjectInspector[] {entry.getValue().rowObjectInspector});
- }
+ String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+ if (context != null && !context.isEmpty()) {
+ context = "_" + context.replace(" ","_");
+ }
+ statsMap.put(Counter.RECORDS_IN + context, recordCounter);
+
+ for (Entry<Operator<?>, StructObjectInspector> entry : childrenOpToOI.entrySet()) {
+ Operator<?> child = entry.getKey();
+ child.initialize(hconf, new ObjectInspector[] {entry.getValue()});
}
}
- /**
- * close extra child operators that are initialized but are not executed.
- */
@Override
public void closeOp(boolean abort) throws HiveException {
- if (extraChildrenToClose != null) {
- for (Operator<? extends OperatorDesc> op : extraChildrenToClose) {
- op.close(abort);
- }
- }
+ recordCounter.set(numRows);
+ super.closeOp(abort);
}
// Find context for current input file
@Override
public void cleanUpInputFileChangedOp() throws HiveException {
+ super.cleanUpInputFileChangedOp();
Path fpath = getExecContext().getCurrentInputPath();
-
- for (String onefile : conf.getPathToAliases().keySet()) {
- Path onepath = normalizePath(onefile);
- // check for the operators who will process rows coming to this Map
- // Operator
- if (onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
- // not from this
- continue;
- }
- PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile);
- for (String onealias : conf.getPathToAliases().get(onefile)) {
- Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(onealias);
- MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc);
- MapOpCtx context = opCtxMap.get(inp);
- if (context != null) {
- current = context;
- LOG.info("Processing alias " + onealias + " for file " + onefile);
- return;
+ String nominalPath = getNominalPath(fpath);
+ Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(nominalPath);
+ if (isLogInfoEnabled) {
+ StringBuilder builder = new StringBuilder();
+ for (MapOpCtx context : contexts.values()) {
+ if (builder.length() > 0) {
+ builder.append(", ");
}
+ builder.append(context.alias);
+ }
+ if (isLogDebugEnabled) {
+ LOG.info("Processing alias(es) " + builder.toString() + " for file " + fpath);
}
}
- throw new IllegalStateException("Invalid path " + fpath);
+ // Add alias, table name, and partitions to hadoop conf so that their
+ // children will inherit these
+ for (Entry<Operator<?>, MapOpCtx> entry : contexts.entrySet()) {
+ Operator<?> operator = entry.getKey();
+ MapOpCtx context = entry.getValue();
+ operator.setInputContext(nominalPath, context.tableName, context.partName);
+ }
+ currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]);
}
- private Path normalizePath(String onefile) {
+ private Path normalizePath(String onefile, boolean schemaless) {
//creating Path is expensive, so cache the corresponding
//Path object in normalizedPaths
Path path = normalizedPaths.get(onefile);
- if(path == null){
+ if (path == null) {
path = new Path(onefile);
+ if (schemaless && path.toUri().getScheme() != null) {
+ path = new Path(path.toUri().getPath());
+ }
normalizedPaths.put(onefile, path);
}
return path;
@@ -500,50 +485,48 @@ public class MapOperator extends Operato
// The child operators cleanup if input file has changed
cleanUpInputFileChanged();
}
- Object row;
- try {
- row = current.readRow(value);
- if (current.hasVC()) {
- current.rowWithPartAndVC[0] = row;
- if (context != null) {
- populateVirtualColumnValues(context, current.vcs, current.vcValues, current.deserializer);
- }
- int vcPos = current.isPartitioned() ? 2 : 1;
- current.rowWithPartAndVC[vcPos] = current.vcValues;
- row = current.rowWithPartAndVC;
- } else if (current.isPartitioned()) {
- current.rowWithPart[0] = row;
- row = current.rowWithPart;
- }
- } catch (Exception e) {
- // Serialize the row and output.
- String rawRowString;
+ int childrenDone = 0;
+ for (MapOpCtx current : currentCtxs) {
+ Object row = null;
try {
- rawRowString = value.toString();
- } catch (Exception e2) {
- rawRowString = "[Error getting row data with exception " +
- StringUtils.stringifyException(e2) + " ]";
+ row = current.readRow(value, context);
+ if (!current.forward(row)) {
+ childrenDone++;
+ }
+ } catch (Exception e) {
+ // TODO: policy on deserialization errors
+ String message = toErrorMessage(value, row, current.rowObjectInspector);
+ if (row == null) {
+ deserialize_error_count.set(deserialize_error_count.get() + 1);
+ throw new HiveException("Hive Runtime Error while processing writable " + message, e);
+ }
+ throw new HiveException("Hive Runtime Error while processing row " + message, e);
}
+ }
+ rowsForwarded(childrenDone, 1);
+ }
- // TODO: policy on deserialization errors
- deserialize_error_count.set(deserialize_error_count.get() + 1);
- throw new HiveException("Hive Runtime Error while processing writable " + rawRowString, e);
+ protected final void rowsForwarded(int childrenDone, int rows) {
+ numRows += rows;
+ if (isLogInfoEnabled) {
+ while (numRows >= cntr) {
+ cntr *= 10;
+ LOG.info(toString() + ": records read - " + numRows);
+ }
}
+ if (childrenDone == currentCtxs.length) {
+ setDone(true);
+ }
+ }
- // The row has been converted to comply with table schema, irrespective of partition schema.
- // So, use tblOI (and not partOI) for forwarding
+ private String toErrorMessage(Writable value, Object row, ObjectInspector inspector) {
try {
- forward(row, current.rowObjectInspector);
- } catch (Exception e) {
- // Serialize the row and output the error message.
- String rowString;
- try {
- rowString = SerDeUtils.getJSONString(row, current.rowObjectInspector);
- } catch (Exception e2) {
- rowString = "[Error getting row data with exception " +
- StringUtils.stringifyException(e2) + " ]";
+ if (row != null) {
+ return SerDeUtils.getJSONString(row, inspector);
}
- throw new HiveException("Hive Runtime Error while processing row " + rowString, e);
+ return String.valueOf(value);
+ } catch (Exception e) {
+ return "[Error getting row data with exception " + StringUtils.stringifyException(e) + " ]";
}
}
@@ -639,4 +622,16 @@ public class MapOperator extends Operato
public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
return MapRecordProcessor.getConnectOps();
}
+
+ public void initializeContexts() {
+ Path fpath = getExecContext().getCurrentInputPath();
+ String nominalPath = getNominalPath(fpath);
+ Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(nominalPath);
+ currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]);
+ }
+
+ public Deserializer getCurrentDeserializer() {
+
+ return currentCtxs[0].deserializer;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Oct 30 16:22:33 2014
@@ -61,6 +61,7 @@ public abstract class Operator<T extends
public static final String HIVECOUNTERCREATEDFILES = "CREATED_FILES";
public static final String HIVECOUNTERFATAL = "FATAL_ERROR";
+ public static final String CONTEXT_NAME_KEY = "__hive.context.name";
private transient Configuration configuration;
protected List<Operator<? extends OperatorDesc>> childOperators;
@@ -210,11 +211,14 @@ public abstract class Operator<T extends
// non-bean ..
- protected transient HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable>();
+ protected transient Map<String, LongWritable> statsMap = new HashMap<String, LongWritable>();
@SuppressWarnings("rawtypes")
protected transient OutputCollector out;
- protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
- protected transient boolean isLogInfoEnabled = LOG.isInfoEnabled();
+ protected transient final Log LOG = LogFactory.getLog(getClass().getName());
+ protected transient final Log PLOG = LogFactory.getLog(Operator.class.getName()); // for simple disabling logs from all operators
+ protected transient final boolean isLogInfoEnabled = LOG.isInfoEnabled() && PLOG.isInfoEnabled();
+ protected transient final boolean isLogDebugEnabled = LOG.isDebugEnabled() && PLOG.isDebugEnabled();
+ protected transient final boolean isLogTraceEnabled = LOG.isTraceEnabled() && PLOG.isTraceEnabled();
protected transient String alias;
protected transient Reporter reporter;
protected transient String id;
@@ -287,9 +291,9 @@ public abstract class Operator<T extends
}
}
- public Map<Enum<?>, Long> getStats() {
- HashMap<Enum<?>, Long> ret = new HashMap<Enum<?>, Long>();
- for (Enum<?> one : statsMap.keySet()) {
+ public Map<String, Long> getStats() {
+ HashMap<String, Long> ret = new HashMap<String, Long>();
+ for (String one : statsMap.keySet()) {
ret.put(one, Long.valueOf(statsMap.get(one).get()));
}
return (ret);
@@ -490,33 +494,45 @@ public abstract class Operator<T extends
public abstract void processOp(Object row, int tag) throws HiveException;
protected final void defaultStartGroup() throws HiveException {
- LOG.debug("Starting group");
+ if (isLogDebugEnabled) {
+ LOG.debug("Starting group");
+ }
if (childOperators == null) {
return;
}
- LOG.debug("Starting group for children:");
+ if (isLogDebugEnabled) {
+ LOG.debug("Starting group for children:");
+ }
for (Operator<? extends OperatorDesc> op : childOperators) {
op.startGroup();
}
- LOG.debug("Start group Done");
+ if (isLogDebugEnabled) {
+ LOG.debug("Start group Done");
+ }
}
protected final void defaultEndGroup() throws HiveException {
- LOG.debug("Ending group");
+ if (isLogDebugEnabled) {
+ LOG.debug("Ending group");
+ }
if (childOperators == null) {
return;
}
- LOG.debug("Ending group for children:");
+ if (isLogDebugEnabled) {
+ LOG.debug("Ending group for children:");
+ }
for (Operator<? extends OperatorDesc> op : childOperators) {
op.endGroup();
}
- LOG.debug("End group Done");
+ if (isLogDebugEnabled) {
+ LOG.debug("End group Done");
+ }
}
// If a operator wants to do some work at the beginning of a group
@@ -807,7 +823,7 @@ public abstract class Operator<T extends
}
public void resetStats() {
- for (Enum<?> e : statsMap.keySet()) {
+ for (String e : statsMap.keySet()) {
statsMap.get(e).set(0L);
}
}
@@ -840,7 +856,7 @@ public abstract class Operator<T extends
}
public void logStats() {
- for (Enum<?> e : statsMap.keySet()) {
+ for (String e : statsMap.keySet()) {
LOG.info(e.toString() + ":" + statsMap.get(e).toString());
}
}
@@ -1046,6 +1062,17 @@ public abstract class Operator<T extends
public void cleanUpInputFileChangedOp() throws HiveException {
}
+ // called by map operator. propagated recursively to single parented descendants
+ public void setInputContext(String inputPath, String tableName, String partitionName) {
+ if (childOperators != null) {
+ for (Operator<? extends OperatorDesc> child : childOperators) {
+ if (child.getNumParent() == 1) {
+ child.setInputContext(inputPath, tableName, partitionName);
+ }
+ }
+ }
+ }
+
public boolean supportSkewJoinOptimization() {
return false;
}
@@ -1263,7 +1290,7 @@ public abstract class Operator<T extends
}
public void setOpTraits(OpTraits metaInfo) {
- if (LOG.isDebugEnabled()) {
+ if (isLogDebugEnabled) {
LOG.debug("Setting traits ("+metaInfo+") on "+this);
}
if (conf != null) {
@@ -1274,7 +1301,7 @@ public abstract class Operator<T extends
}
public void setStatistics(Statistics stats) {
- if (LOG.isDebugEnabled()) {
+ if (isLogDebugEnabled) {
LOG.debug("Setting stats ("+stats+") on "+this);
}
if (conf != null) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Thu Oct 30 16:22:33 2014
@@ -143,4 +143,8 @@ public class OperatorUtils {
}
}
}
+
+ public static boolean sameRowSchema(Operator<?> operator1, Operator<?> operator2) {
+ return operator1.getSchema().equals(operator2.getSchema());
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java Thu Oct 30 16:22:33 2014
@@ -64,6 +64,7 @@ public class OrcFileMergeOperator extend
private void processKeyValuePairs(Object key, Object value)
throws HiveException {
+ String filePath = "";
try {
OrcFileValueWrapper v;
OrcFileKeyWrapper k;
@@ -72,6 +73,7 @@ public class OrcFileMergeOperator extend
} else {
k = (OrcFileKeyWrapper) key;
}
+ filePath = k.getInputPath().toUri().getPath();
fixTmpPath(k.getInputPath().getParent());
@@ -131,6 +133,16 @@ public class OrcFileMergeOperator extend
this.exception = true;
closeOp(true);
throw new HiveException(e);
+ } finally {
+ if (fdis != null) {
+ try {
+ fdis.close();
+ } catch (IOException e) {
+ throw new HiveException(String.format("Unable to close file %s", filePath), e);
+ } finally {
+ fdis = null;
+ }
+ }
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Thu Oct 30 16:22:33 2014
@@ -337,17 +337,20 @@ public class PTFOperator extends Operato
handleOutputRows(tabFn.finishPartition());
} else {
if ( tabFn.canIterateOutput() ) {
- outputPartRowsItr = tabFn.iterator(inputPart.iterator());
+ outputPartRowsItr = inputPart == null ? null :
+ tabFn.iterator(inputPart.iterator());
} else {
- outputPart = tabFn.execute(inputPart);
- outputPartRowsItr = outputPart.iterator();
+ outputPart = inputPart == null ? null : tabFn.execute(inputPart);
+ outputPartRowsItr = outputPart == null ? null : outputPart.iterator();
}
if ( next != null ) {
if (!next.isStreaming() && !isOutputIterator() ) {
next.inputPart = outputPart;
} else {
- while(outputPartRowsItr.hasNext() ) {
- next.processRow(outputPartRowsItr.next());
+ if ( outputPartRowsItr != null ) {
+ while(outputPartRowsItr.hasNext() ) {
+ next.processRow(outputPartRowsItr.next());
+ }
}
}
}
@@ -357,8 +360,10 @@ public class PTFOperator extends Operato
next.finishPartition();
} else {
if (!isStreaming() ) {
- while(outputPartRowsItr.hasNext() ) {
- forward(outputPartRowsItr.next(), outputObjInspector);
+ if ( outputPartRowsItr != null ) {
+ while(outputPartRowsItr.hasNext() ) {
+ forward(outputPartRowsItr.next(), outputObjInspector);
+ }
}
}
}