You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/01/23 20:59:24 UTC
svn commit: r1654355 [11/27] - in /hive/branches/llap: ./
beeline/src/java/org/apache/hive/beeline/
cli/src/java/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ data/conf/ data/con...
Modified: hive/branches/llap/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java (original)
+++ hive/branches/llap/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java Fri Jan 23 19:59:11 2015
@@ -4796,6 +4796,16 @@ public final class OrcProto {
* <code>optional .org.apache.hadoop.hive.ql.io.orc.TimestampStatistics timestampStatistics = 9;</code>
*/
org.apache.hadoop.hive.ql.io.orc.OrcProto.TimestampStatisticsOrBuilder getTimestampStatisticsOrBuilder();
+
+ // optional bool hasNull = 10;
+ /**
+ * <code>optional bool hasNull = 10;</code>
+ */
+ boolean hasHasNull();
+ /**
+ * <code>optional bool hasNull = 10;</code>
+ */
+ boolean getHasNull();
}
/**
* Protobuf type {@code org.apache.hadoop.hive.ql.io.orc.ColumnStatistics}
@@ -4957,6 +4967,11 @@ public final class OrcProto {
bitField0_ |= 0x00000100;
break;
}
+ case 80: {
+ bitField0_ |= 0x00000200;
+ hasNull_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -5189,6 +5204,22 @@ public final class OrcProto {
return timestampStatistics_;
}
+ // optional bool hasNull = 10;
+ public static final int HASNULL_FIELD_NUMBER = 10;
+ private boolean hasNull_;
+ /**
+ * <code>optional bool hasNull = 10;</code>
+ */
+ public boolean hasHasNull() {
+ return ((bitField0_ & 0x00000200) == 0x00000200);
+ }
+ /**
+ * <code>optional bool hasNull = 10;</code>
+ */
+ public boolean getHasNull() {
+ return hasNull_;
+ }
+
private void initFields() {
numberOfValues_ = 0L;
intStatistics_ = org.apache.hadoop.hive.ql.io.orc.OrcProto.IntegerStatistics.getDefaultInstance();
@@ -5199,6 +5230,7 @@ public final class OrcProto {
dateStatistics_ = org.apache.hadoop.hive.ql.io.orc.OrcProto.DateStatistics.getDefaultInstance();
binaryStatistics_ = org.apache.hadoop.hive.ql.io.orc.OrcProto.BinaryStatistics.getDefaultInstance();
timestampStatistics_ = org.apache.hadoop.hive.ql.io.orc.OrcProto.TimestampStatistics.getDefaultInstance();
+ hasNull_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -5239,6 +5271,9 @@ public final class OrcProto {
if (((bitField0_ & 0x00000100) == 0x00000100)) {
output.writeMessage(9, timestampStatistics_);
}
+ if (((bitField0_ & 0x00000200) == 0x00000200)) {
+ output.writeBool(10, hasNull_);
+ }
getUnknownFields().writeTo(output);
}
@@ -5284,6 +5319,10 @@ public final class OrcProto {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(9, timestampStatistics_);
}
+ if (((bitField0_ & 0x00000200) == 0x00000200)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(10, hasNull_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -5458,6 +5497,8 @@ public final class OrcProto {
timestampStatisticsBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000100);
+ hasNull_ = false;
+ bitField0_ = (bitField0_ & ~0x00000200);
return this;
}
@@ -5554,6 +5595,10 @@ public final class OrcProto {
} else {
result.timestampStatistics_ = timestampStatisticsBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+ to_bitField0_ |= 0x00000200;
+ }
+ result.hasNull_ = hasNull_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -5597,6 +5642,9 @@ public final class OrcProto {
if (other.hasTimestampStatistics()) {
mergeTimestampStatistics(other.getTimestampStatistics());
}
+ if (other.hasHasNull()) {
+ setHasNull(other.getHasNull());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -6593,6 +6641,39 @@ public final class OrcProto {
return timestampStatisticsBuilder_;
}
+ // optional bool hasNull = 10;
+ private boolean hasNull_ ;
+ /**
+ * <code>optional bool hasNull = 10;</code>
+ */
+ public boolean hasHasNull() {
+ return ((bitField0_ & 0x00000200) == 0x00000200);
+ }
+ /**
+ * <code>optional bool hasNull = 10;</code>
+ */
+ public boolean getHasNull() {
+ return hasNull_;
+ }
+ /**
+ * <code>optional bool hasNull = 10;</code>
+ */
+ public Builder setHasNull(boolean value) {
+ bitField0_ |= 0x00000200;
+ hasNull_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool hasNull = 10;</code>
+ */
+ public Builder clearHasNull() {
+ bitField0_ = (bitField0_ & ~0x00000200);
+ hasNull_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:org.apache.hadoop.hive.ql.io.orc.ColumnStatistics)
}
@@ -17657,7 +17738,7 @@ public final class OrcProto {
"\"2\n\016DateStatistics\022\017\n\007minimum\030\001 \001(\021\022\017\n\007m",
"aximum\030\002 \001(\021\"7\n\023TimestampStatistics\022\017\n\007m" +
"inimum\030\001 \001(\022\022\017\n\007maximum\030\002 \001(\022\"\037\n\020BinaryS" +
- "tatistics\022\013\n\003sum\030\001 \001(\022\"\234\005\n\020ColumnStatist" +
+ "tatistics\022\013\n\003sum\030\001 \001(\022\"\255\005\n\020ColumnStatist" +
"ics\022\026\n\016numberOfValues\030\001 \001(\004\022J\n\rintStatis" +
"tics\030\002 \001(\01323.org.apache.hadoop.hive.ql.i" +
"o.orc.IntegerStatistics\022L\n\020doubleStatist" +
@@ -17674,60 +17755,60 @@ public final class OrcProto {
"org.apache.hadoop.hive.ql.io.orc.BinaryS" +
"tatistics\022R\n\023timestampStatistics\030\t \001(\01325" +
".org.apache.hadoop.hive.ql.io.orc.Timest" +
- "ampStatistics\"n\n\rRowIndexEntry\022\025\n\tpositi",
- "ons\030\001 \003(\004B\002\020\001\022F\n\nstatistics\030\002 \001(\01322.org." +
- "apache.hadoop.hive.ql.io.orc.ColumnStati" +
- "stics\"J\n\010RowIndex\022>\n\005entry\030\001 \003(\0132/.org.a" +
- "pache.hadoop.hive.ql.io.orc.RowIndexEntr" +
- "y\"\331\001\n\006Stream\022;\n\004kind\030\001 \002(\0162-.org.apache." +
- "hadoop.hive.ql.io.orc.Stream.Kind\022\016\n\006col" +
- "umn\030\002 \001(\r\022\016\n\006length\030\003 \001(\004\"r\n\004Kind\022\013\n\007PRE" +
- "SENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGTH\020\002\022\023\n\017DICTIONA" +
- "RY_DATA\020\003\022\024\n\020DICTIONARY_COUNT\020\004\022\r\n\tSECON" +
- "DARY\020\005\022\r\n\tROW_INDEX\020\006\"\263\001\n\016ColumnEncoding",
- "\022C\n\004kind\030\001 \002(\01625.org.apache.hadoop.hive." +
- "ql.io.orc.ColumnEncoding.Kind\022\026\n\016diction" +
- "arySize\030\002 \001(\r\"D\n\004Kind\022\n\n\006DIRECT\020\000\022\016\n\nDIC" +
- "TIONARY\020\001\022\r\n\tDIRECT_V2\020\002\022\021\n\rDICTIONARY_V" +
- "2\020\003\"\214\001\n\014StripeFooter\0229\n\007streams\030\001 \003(\0132(." +
- "org.apache.hadoop.hive.ql.io.orc.Stream\022" +
- "A\n\007columns\030\002 \003(\01320.org.apache.hadoop.hiv" +
- "e.ql.io.orc.ColumnEncoding\"\370\002\n\004Type\0229\n\004k" +
- "ind\030\001 \002(\0162+.org.apache.hadoop.hive.ql.io" +
- ".orc.Type.Kind\022\024\n\010subtypes\030\002 \003(\rB\002\020\001\022\022\n\n",
- "fieldNames\030\003 \003(\t\022\025\n\rmaximumLength\030\004 \001(\r\022" +
- "\021\n\tprecision\030\005 \001(\r\022\r\n\005scale\030\006 \001(\r\"\321\001\n\004Ki" +
- "nd\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003" +
- "INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n" +
- "\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n" +
- "\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022" +
- "\013\n\007DECIMAL\020\016\022\010\n\004DATE\020\017\022\013\n\007VARCHAR\020\020\022\010\n\004C" +
- "HAR\020\021\"x\n\021StripeInformation\022\016\n\006offset\030\001 \001" +
- "(\004\022\023\n\013indexLength\030\002 \001(\004\022\022\n\ndataLength\030\003 " +
- "\001(\004\022\024\n\014footerLength\030\004 \001(\004\022\024\n\014numberOfRow",
- "s\030\005 \001(\004\"/\n\020UserMetadataItem\022\014\n\004name\030\001 \002(" +
- "\t\022\r\n\005value\030\002 \002(\014\"X\n\020StripeStatistics\022D\n\010" +
- "colStats\030\001 \003(\01322.org.apache.hadoop.hive." +
- "ql.io.orc.ColumnStatistics\"S\n\010Metadata\022G" +
- "\n\013stripeStats\030\001 \003(\01322.org.apache.hadoop." +
- "hive.ql.io.orc.StripeStatistics\"\356\002\n\006Foot" +
- "er\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rcontentLengt" +
- "h\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apache.ha" +
- "doop.hive.ql.io.orc.StripeInformation\0225\n" +
- "\005types\030\004 \003(\0132&.org.apache.hadoop.hive.ql",
- ".io.orc.Type\022D\n\010metadata\030\005 \003(\01322.org.apa" +
- "che.hadoop.hive.ql.io.orc.UserMetadataIt" +
- "em\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatistics\030\007" +
- " \003(\01322.org.apache.hadoop.hive.ql.io.orc." +
- "ColumnStatistics\022\026\n\016rowIndexStride\030\010 \001(\r" +
- "\"\334\001\n\nPostScript\022\024\n\014footerLength\030\001 \001(\004\022F\n" +
- "\013compression\030\002 \001(\01621.org.apache.hadoop.h" +
- "ive.ql.io.orc.CompressionKind\022\034\n\024compres" +
- "sionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001" +
- "\022\026\n\016metadataLength\030\005 \001(\004\022\025\n\rwriterVersio",
- "n\030\006 \001(\r\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKi" +
- "nd\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZ" +
- "O\020\003"
+ "ampStatistics\022\017\n\007hasNull\030\n \001(\010\"n\n\rRowInd",
+ "exEntry\022\025\n\tpositions\030\001 \003(\004B\002\020\001\022F\n\nstatis" +
+ "tics\030\002 \001(\01322.org.apache.hadoop.hive.ql.i" +
+ "o.orc.ColumnStatistics\"J\n\010RowIndex\022>\n\005en" +
+ "try\030\001 \003(\0132/.org.apache.hadoop.hive.ql.io" +
+ ".orc.RowIndexEntry\"\331\001\n\006Stream\022;\n\004kind\030\001 " +
+ "\002(\0162-.org.apache.hadoop.hive.ql.io.orc.S" +
+ "tream.Kind\022\016\n\006column\030\002 \001(\r\022\016\n\006length\030\003 \001" +
+ "(\004\"r\n\004Kind\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LEN" +
+ "GTH\020\002\022\023\n\017DICTIONARY_DATA\020\003\022\024\n\020DICTIONARY" +
+ "_COUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\263",
+ "\001\n\016ColumnEncoding\022C\n\004kind\030\001 \002(\01625.org.ap" +
+ "ache.hadoop.hive.ql.io.orc.ColumnEncodin" +
+ "g.Kind\022\026\n\016dictionarySize\030\002 \001(\r\"D\n\004Kind\022\n" +
+ "\n\006DIRECT\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT_V2\020" +
+ "\002\022\021\n\rDICTIONARY_V2\020\003\"\214\001\n\014StripeFooter\0229\n" +
+ "\007streams\030\001 \003(\0132(.org.apache.hadoop.hive." +
+ "ql.io.orc.Stream\022A\n\007columns\030\002 \003(\01320.org." +
+ "apache.hadoop.hive.ql.io.orc.ColumnEncod" +
+ "ing\"\370\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apache." +
+ "hadoop.hive.ql.io.orc.Type.Kind\022\024\n\010subty",
+ "pes\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\022\025\n\rmax" +
+ "imumLength\030\004 \001(\r\022\021\n\tprecision\030\005 \001(\r\022\r\n\005s" +
+ "cale\030\006 \001(\r\"\321\001\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE" +
+ "\020\001\022\t\n\005SHORT\020\002\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOA" +
+ "T\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006STRING\020\007\022\n\n\006BINARY\020\010\022" +
+ "\r\n\tTIMESTAMP\020\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STR" +
+ "UCT\020\014\022\t\n\005UNION\020\r\022\013\n\007DECIMAL\020\016\022\010\n\004DATE\020\017\022" +
+ "\013\n\007VARCHAR\020\020\022\010\n\004CHAR\020\021\"x\n\021StripeInformat" +
+ "ion\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002 \001(\004" +
+ "\022\022\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength\030\004 \001",
+ "(\004\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMetadata" +
+ "Item\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"X\n\020Str" +
+ "ipeStatistics\022D\n\010colStats\030\001 \003(\01322.org.ap" +
+ "ache.hadoop.hive.ql.io.orc.ColumnStatist" +
+ "ics\"S\n\010Metadata\022G\n\013stripeStats\030\001 \003(\01322.o" +
+ "rg.apache.hadoop.hive.ql.io.orc.StripeSt" +
+ "atistics\"\356\002\n\006Footer\022\024\n\014headerLength\030\001 \001(" +
+ "\004\022\025\n\rcontentLength\030\002 \001(\004\022D\n\007stripes\030\003 \003(" +
+ "\01323.org.apache.hadoop.hive.ql.io.orc.Str" +
+ "ipeInformation\0225\n\005types\030\004 \003(\0132&.org.apac",
+ "he.hadoop.hive.ql.io.orc.Type\022D\n\010metadat" +
+ "a\030\005 \003(\01322.org.apache.hadoop.hive.ql.io.o" +
+ "rc.UserMetadataItem\022\024\n\014numberOfRows\030\006 \001(" +
+ "\004\022F\n\nstatistics\030\007 \003(\01322.org.apache.hadoo" +
+ "p.hive.ql.io.orc.ColumnStatistics\022\026\n\016row" +
+ "IndexStride\030\010 \001(\r\"\334\001\n\nPostScript\022\024\n\014foot" +
+ "erLength\030\001 \001(\004\022F\n\013compression\030\002 \001(\01621.or" +
+ "g.apache.hadoop.hive.ql.io.orc.Compressi" +
+ "onKind\022\034\n\024compressionBlockSize\030\003 \001(\004\022\023\n\007" +
+ "version\030\004 \003(\rB\002\020\001\022\026\n\016metadataLength\030\005 \001(",
+ "\004\022\025\n\rwriterVersion\030\006 \001(\r\022\016\n\005magic\030\300> \001(\t" +
+ "*:\n\017CompressionKind\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022" +
+ "\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -17787,7 +17868,7 @@ public final class OrcProto {
internal_static_org_apache_hadoop_hive_ql_io_orc_ColumnStatistics_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_apache_hadoop_hive_ql_io_orc_ColumnStatistics_descriptor,
- new java.lang.String[] { "NumberOfValues", "IntStatistics", "DoubleStatistics", "StringStatistics", "BucketStatistics", "DecimalStatistics", "DateStatistics", "BinaryStatistics", "TimestampStatistics", });
+ new java.lang.String[] { "NumberOfValues", "IntStatistics", "DoubleStatistics", "StringStatistics", "BucketStatistics", "DecimalStatistics", "DateStatistics", "BinaryStatistics", "TimestampStatistics", "HasNull", });
internal_static_org_apache_hadoop_hive_ql_io_orc_RowIndexEntry_descriptor =
getDescriptor().getMessageTypes().get(9);
internal_static_org_apache_hadoop_hive_ql_io_orc_RowIndexEntry_fieldAccessorTable = new
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Context.java Fri Jan 23 19:59:11 2015
@@ -90,7 +90,7 @@ public class Context {
protected int tryCount = 0;
private TokenRewriteStream tokenRewriteStream;
- String executionId;
+ private String executionId;
// List of Locks for this query
protected List<HiveLock> hiveLocks;
@@ -112,6 +112,8 @@ public class Context {
private final Map<WriteEntity, List<HiveLockObj>> outputLockObjects =
new HashMap<WriteEntity, List<HiveLockObj>>();
+ private final String stagingDir;
+
public Context(Configuration conf) throws IOException {
this(conf, generateExecutionId());
}
@@ -129,6 +131,7 @@ public class Context {
nonLocalScratchPath = new Path(SessionState.getHDFSSessionPath(conf), executionId);
localScratchDir = new Path(SessionState.getLocalSessionPath(conf), executionId).toUri().getPath();
scratchDirPermission = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION);
+ stagingDir = HiveConf.getVar(conf, HiveConf.ConfVars.STAGINGDIR);
}
@@ -188,6 +191,65 @@ public class Context {
}
/**
+ * Gets a temporary staging directory related to a path.
+ * If a path already contains a staging directory, then returns the current directory; otherwise
+ * create the directory if needed.
+ *
+ * @param inputPath URI of the temporary directory
+ * @param mkdir Create the directory if True.
+ * @return A temporary path.
+ */
+ private Path getStagingDir(Path inputPath, boolean mkdir) {
+ final URI inputPathUri = inputPath.toUri();
+ final String inputPathName = inputPathUri.getPath();
+ final String fileSystem = inputPathUri.getScheme() + ":" + inputPathUri.getAuthority();
+ final FileSystem fs;
+
+ try {
+ fs = inputPath.getFileSystem(conf);
+ } catch (IOException e) {
+ throw new IllegalStateException("Error getting FileSystem for " + inputPath + ": "+ e, e);
+ }
+
+ String stagingPathName;
+ if (inputPathName.indexOf(stagingDir) == -1) {
+ stagingPathName = new Path(inputPathName, stagingDir).toString();
+ } else {
+ stagingPathName = inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length());
+ }
+
+ final String key = fileSystem + "-" + stagingPathName + "-" + TaskRunner.getTaskRunnerID();
+
+ Path dir = fsScratchDirs.get(key);
+ if (dir == null) {
+ // Append task specific info to stagingPathName, instead of creating a sub-directory.
+ // This way we don't have to worry about deleting the stagingPathName separately at
+ // end of query execution.
+ dir = fs.makeQualified(new Path(stagingPathName + "_" + this.executionId + "-" + TaskRunner.getTaskRunnerID()));
+
+ LOG.debug("Created staging dir = " + dir + " for path = " + inputPath);
+
+ if (mkdir) {
+ try {
+ if (!FileUtils.mkdir(fs, dir, true, conf)) {
+ throw new IllegalStateException("Cannot create staging directory '" + dir.toString() + "'");
+ }
+
+ if (isHDFSCleanup) {
+ fs.deleteOnExit(dir);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot create staging directory '" + dir.toString() + "': " + e.getMessage(), e);
+ }
+ }
+
+ fsScratchDirs.put(key, dir);
+ }
+
+ return dir;
+ }
+
+ /**
* Get a tmp directory on specified URI
*
* @param scheme Scheme of the target FS
@@ -274,14 +336,13 @@ public class Context {
}
private Path getExternalScratchDir(URI extURI) {
- return getScratchDir(extURI.getScheme(), extURI.getAuthority(),
- !explain, nonLocalScratchPath.toUri().getPath());
+ return getStagingDir(new Path(extURI.getScheme(), extURI.getAuthority(), extURI.getPath()), !explain);
}
/**
* Remove any created scratch directories.
*/
- private void removeScratchDir() {
+ public void removeScratchDir() {
for (Map.Entry<String, Path> entry : fsScratchDirs.entrySet()) {
try {
Path p = entry.getValue();
@@ -313,6 +374,10 @@ public class Context {
(uriStr.indexOf(MR_PREFIX) != -1);
}
+ public Path getMRTmpPath(URI uri) {
+ return new Path(getStagingDir(new Path(uri), !explain), MR_PREFIX + nextPathId());
+ }
+
/**
* Get a path to store map-reduce intermediate data in.
*
@@ -333,10 +398,9 @@ public class Context {
}
/**
- * Get a path to store tmp data destined for external URI.
+ * Get a path to store tmp data destined for external Path.
*
- * @param extURI
- * external URI to which the tmp data has to be eventually moved
+ * @param path external Path to which the tmp data has to be eventually moved
* @return next available tmp path on the file system corresponding extURI
*/
public Path getExternalTmpPath(Path path) {
@@ -357,9 +421,7 @@ public class Context {
* path within /tmp
*/
public Path getExtTmpPathRelTo(Path path) {
- URI uri = path.toUri();
- return new Path (getScratchDir(uri.getScheme(), uri.getAuthority(), !explain,
- uri.getPath() + Path.SEPARATOR + "_" + this.executionId), EXT_PREFIX + nextPathId());
+ return new Path(getStagingDir(path, !explain), EXT_PREFIX + nextPathId());
}
/**
@@ -437,7 +499,7 @@ public class Context {
resFs = resDir.getFileSystem(conf);
FileStatus status = resFs.getFileStatus(resDir);
assert status.isDir();
- FileStatus[] resDirFS = resFs.globStatus(new Path(resDir + "/*"));
+ FileStatus[] resDirFS = resFs.globStatus(new Path(resDir + "/*"), FileUtils.HIDDEN_FILES_PATH_FILTER);
resDirPaths = new Path[resDirFS.length];
int pos = 0;
for (FileStatus resFS : resDirFS) {
@@ -539,6 +601,13 @@ public class Context {
* Today this translates into running hadoop jobs locally
*/
public boolean isLocalOnlyExecutionMode() {
+ // Always allow spark to run in a cluster mode. Without this, depending on
+ // user's local hadoop settings, true may be returned, which causes plan to be
+ // stored in local path.
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ return false;
+ }
+
return ShimLoader.getHadoopShims().isLocalMode(conf);
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Fri Jan 23 19:59:11 2015
@@ -65,6 +65,7 @@ import org.apache.hadoop.hive.ql.hooks.P
import org.apache.hadoop.hive.ql.hooks.PreExecute;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.hooks.Redactor;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
@@ -439,6 +440,11 @@ public class Driver implements CommandPr
SessionState.get().getCommandType());
String queryStr = plan.getQueryStr();
+ List<Redactor> queryRedactors = getHooks(ConfVars.QUERYREDACTORHOOKS, Redactor.class);
+ for (Redactor redactor : queryRedactors) {
+ redactor.setConf(conf);
+ queryStr = redactor.redactQuery(queryStr);
+ }
conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr);
conf.set("mapreduce.workflow.id", "hive_" + queryId);
@@ -699,15 +705,13 @@ public class Driver implements CommandPr
|| op.equals(HiveOperation.QUERY)) {
SemanticAnalyzer querySem = (SemanticAnalyzer) sem;
ParseContext parseCtx = querySem.getParseContext();
- Map<TableScanOperator, Table> tsoTopMap = parseCtx.getTopToTable();
for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpMap : querySem
.getParseContext().getTopOps().entrySet()) {
Operator<? extends OperatorDesc> topOp = topOpMap.getValue();
- if (topOp instanceof TableScanOperator
- && tsoTopMap.containsKey(topOp)) {
+ if (topOp instanceof TableScanOperator) {
TableScanOperator tableScanOp = (TableScanOperator) topOp;
- Table tbl = tsoTopMap.get(tableScanOp);
+ Table tbl = tableScanOp.getConf().getTableMetadata();
List<Integer> neededColumnIds = tableScanOp.getNeededColumnIDs();
List<FieldSchema> columns = tbl.getCols();
List<String> cols = new ArrayList<String>();
@@ -1343,7 +1347,8 @@ public class Driver implements CommandPr
}
int jobs = Utilities.getMRTasks(plan.getRootTasks()).size()
- + Utilities.getTezTasks(plan.getRootTasks()).size();
+ + Utilities.getTezTasks(plan.getRootTasks()).size()
+ + Utilities.getSparkTasks(plan.getRootTasks()).size();
if (jobs > 0) {
console.printInfo("Query ID = " + plan.getQueryId());
console.printInfo("Total jobs = " + jobs);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java Fri Jan 23 19:59:11 2015
@@ -34,6 +34,8 @@ public class HashTableLoaderFactory {
public static HashTableLoader getLoader(Configuration hconf) {
if (HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
return new org.apache.hadoop.hive.ql.exec.tez.HashTableLoader();
+ } else if (HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ return new org.apache.hadoop.hive.ql.exec.spark.HashTableLoader();
} else {
return new org.apache.hadoop.hive.ql.exec.mr.HashTableLoader();
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Fri Jan 23 19:59:11 2015
@@ -169,8 +169,6 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.tools.HadoopArchives;
import org.apache.hadoop.util.ReflectionUtils;
@@ -4110,6 +4108,23 @@ public class DDLTask extends Task<DDLWor
params.putAll(crtTbl.getTblProps());
}
+ if (crtTbl.isUserStorageFormat()) {
+ tbl.setInputFormatClass(crtTbl.getDefaultInputFormat());
+ tbl.setOutputFormatClass(crtTbl.getDefaultOutputFormat());
+ tbl.getTTable().getSd().setInputFormat(
+ tbl.getInputFormatClass().getName());
+ tbl.getTTable().getSd().setOutputFormat(
+ tbl.getOutputFormatClass().getName());
+ if (crtTbl.getDefaultSerName() == null) {
+ LOG.info("Default to LazySimpleSerDe for like table " + crtTbl.getTableName());
+ tbl.setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+ } else {
+ // let's validate that the serde exists
+ validateSerDe(crtTbl.getDefaultSerName());
+ tbl.setSerializationLib(crtTbl.getDefaultSerName());
+ }
+ }
+
tbl.getTTable().setTemporary(crtTbl.isTemporary());
if (crtTbl.isExternal()) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Fri Jan 23 19:59:11 2015
@@ -32,13 +32,13 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
-import java.util.LinkedHashMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExplainWork;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.security.authorization.AuthorizationFactory;
@@ -428,6 +429,36 @@ public class ExplainTask extends Task<Ex
json.accumulate(ent.getKey().toString(), jsonDep);
}
}
+ } else if (ent.getValue() != null && !((List<?>) ent.getValue()).isEmpty()
+ && ((List<?>) ent.getValue()).get(0) != null &&
+ ((List<?>) ent.getValue()).get(0) instanceof SparkWork.Dependency) {
+ if (out != null) {
+ boolean isFirst = true;
+ for (SparkWork.Dependency dep: (List<SparkWork.Dependency>) ent.getValue()) {
+ if (!isFirst) {
+ out.print(", ");
+ } else {
+ out.print("<- ");
+ isFirst = false;
+ }
+ out.print(dep.getName());
+ out.print(" (");
+ out.print(dep.getShuffleType());
+ out.print(", ");
+ out.print(dep.getNumPartitions());
+ out.print(")");
+ }
+ out.println();
+ }
+ if (jsonOutput) {
+ for (SparkWork.Dependency dep: (List<SparkWork.Dependency>) ent.getValue()) {
+ JSONObject jsonDep = new JSONObject();
+ jsonDep.put("parent", dep.getName());
+ jsonDep.put("type", dep.getShuffleType());
+ jsonDep.put("partitions", dep.getNumPartitions());
+ json.accumulate(ent.getKey().toString(), jsonDep);
+ }
+ }
} else {
if (out != null) {
out.print(ent.getValue().toString());
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluatorFactory.java Fri Jan 23 19:59:11 2015
@@ -64,6 +64,20 @@ public final class ExprNodeEvaluatorFact
"Cannot find ExprNodeEvaluator for the exprNodeDesc = " + desc);
}
+ public static ExprNodeEvaluator[] toCachedEvals(ExprNodeEvaluator[] evals) {
+ EvaluatorContext context = new EvaluatorContext();
+ for (int i = 0; i < evals.length; i++) {
+ if (evals[i] instanceof ExprNodeGenericFuncEvaluator) {
+ iterate(evals[i], context);
+ if (context.hasReference) {
+ evals[i] = new ExprNodeEvaluatorHead(evals[i]);
+ context.hasReference = false;
+ }
+ }
+ }
+ return evals;
+ }
+
/**
* Should be called before eval is initialized
*/
@@ -100,12 +114,14 @@ public final class ExprNodeEvaluatorFact
private static class EvaluatorContext {
- private final Map<String, ExprNodeEvaluator> cached = new HashMap<String, ExprNodeEvaluator>();
+ private final Map<ExprNodeDesc.ExprNodeDescEqualityWrapper, ExprNodeEvaluator> cached =
+ new HashMap<ExprNodeDesc.ExprNodeDescEqualityWrapper, ExprNodeEvaluator>();
private boolean hasReference;
public ExprNodeEvaluator getEvaluated(ExprNodeEvaluator eval) {
- String key = eval.getExpr().toString();
+ ExprNodeDesc.ExprNodeDescEqualityWrapper key =
+ new ExprNodeDesc.ExprNodeDescEqualityWrapper(eval.expr);
ExprNodeEvaluator prev = cached.get(key);
if (prev == null) {
cached.put(key, eval);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Fri Jan 23 19:59:11 2015
@@ -637,10 +637,10 @@ public class FetchOperator implements Se
boolean recursive = HiveConf.getBoolVar(job, HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE);
// If this is in acid format always read it recursively regardless of what the jobconf says.
if (!recursive && !AcidUtils.isAcid(p, job)) {
- return fs.listStatus(p);
+ return fs.listStatus(p, FileUtils.HIDDEN_FILES_PATH_FILTER);
}
List<FileStatus> results = new ArrayList<FileStatus>();
- for (FileStatus stat : fs.listStatus(p)) {
+ for (FileStatus stat : fs.listStatus(p, FileUtils.HIDDEN_FILES_PATH_FILTER)) {
FileUtils.listStatusRecursively(fs, stat, results);
}
return results.toArray(new FileStatus[results.size()]);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java Fri Jan 23 19:59:11 2015
@@ -129,7 +129,7 @@ public class FetchTask extends Task<Fetc
rowsRet = work.getLimit() >= 0 ? Math.min(work.getLimit() - totalRows, maxRows) : maxRows;
}
try {
- if (rowsRet <= 0) {
+ if (rowsRet <= 0 || work.getLimit() == totalRows) {
fetch.clearFetchContext();
return false;
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Fri Jan 23 19:59:11 2015
@@ -68,11 +68,16 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim;
+import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
+
/**
* File Sink operator implementation.
**/
@@ -88,6 +93,7 @@ public class FileSinkOperator extends Te
protected transient List<String> dpColNames;
protected transient DynamicPartitionCtx dpCtx;
protected transient boolean isCompressed;
+ protected transient boolean isTemporary;
protected transient Path parent;
protected transient HiveOutputFormat<?, ?> hiveOutputFormat;
protected transient Path specPath;
@@ -318,6 +324,7 @@ public class FileSinkOperator extends Te
this.hconf = hconf;
filesCreated = false;
isNativeTable = !conf.getTableInfo().isNonNative();
+ isTemporary = conf.isTemporary();
multiFileSpray = conf.isMultiFileSpray();
totalFiles = conf.getTotalFiles();
numFiles = conf.getNumFiles();
@@ -384,6 +391,20 @@ public class FileSinkOperator extends Te
valToPaths.put("", fsp); // special entry for non-DP case
}
}
+
+ final StoragePolicyValue tmpStorage = StoragePolicyValue.lookup(HiveConf
+ .getVar(hconf, HIVE_TEMPORARY_TABLE_STORAGE));
+ if (isTemporary && fsp != null
+ && tmpStorage != StoragePolicyValue.DEFAULT) {
+ final Path outputPath = fsp.taskOutputTempPath;
+ StoragePolicyShim shim = ShimLoader.getHadoopShims()
+ .getStoragePolicyShim(fs);
+ if (shim != null) {
+ // directory creation is otherwise within the writers
+ fs.mkdirs(outputPath);
+ shim.setStoragePolicy(outputPath, tmpStorage);
+ }
+ }
if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
conf.getWriteType() == AcidUtils.Operation.DELETE) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Fri Jan 23 19:59:11 2015
@@ -59,7 +59,7 @@ public class FilterOperator extends Oper
}
conditionInspector = null;
- ioContext = IOContext.get(hconf.get(Utilities.INPUT_NAME));
+ ioContext = IOContext.get(hconf);
} catch (Throwable e) {
throw new HiveException(e);
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Fri Jan 23 19:59:11 2015
@@ -41,9 +41,7 @@ import javax.xml.parsers.DocumentBuilder
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -248,6 +246,7 @@ public final class FunctionRegistry {
registerUDF("reverse", UDFReverse.class, false);
registerGenericUDF("field", GenericUDFField.class);
registerUDF("find_in_set", UDFFindInSet.class, false);
+ registerGenericUDF("initcap", GenericUDFInitCap.class);
registerUDF("like", UDFLike.class, true);
registerUDF("rlike", UDFRegExp.class, true);
@@ -273,10 +272,12 @@ public final class FunctionRegistry {
registerUDF("from_unixtime", UDFFromUnixTime.class, false);
registerGenericUDF("to_date", GenericUDFDate.class);
registerUDF("weekofyear", UDFWeekOfYear.class, false);
+ registerGenericUDF("last_day", GenericUDFLastDay.class);
registerGenericUDF("date_add", GenericUDFDateAdd.class);
registerGenericUDF("date_sub", GenericUDFDateSub.class);
registerGenericUDF("datediff", GenericUDFDateDiff.class);
+ registerGenericUDF("add_months", GenericUDFAddMonths.class);
registerUDF("get_json_object", UDFJson.class, false);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Jan 23 19:59:11 2015
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.reflect.Field;
@@ -34,15 +33,12 @@ import java.util.Set;
import javolution.util.FastBitSet;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -72,115 +68,110 @@ import org.apache.hadoop.io.Text;
/**
* GroupBy operator implementation.
*/
-public class GroupByOperator extends Operator<GroupByDesc> implements
- Serializable {
+public class GroupByOperator extends Operator<GroupByDesc> {
- private static final Log LOG = LogFactory.getLog(GroupByOperator.class
- .getName());
- private static final boolean isTraceEnabled = LOG.isTraceEnabled();
private static final long serialVersionUID = 1L;
private static final int NUMROWSESTIMATESIZE = 1000;
- protected transient ExprNodeEvaluator[] keyFields;
- protected transient ObjectInspector[] keyObjectInspectors;
+ private transient ExprNodeEvaluator[] keyFields;
+ private transient ObjectInspector[] keyObjectInspectors;
- protected transient ExprNodeEvaluator[][] aggregationParameterFields;
- protected transient ObjectInspector[][] aggregationParameterObjectInspectors;
- protected transient ObjectInspector[][] aggregationParameterStandardObjectInspectors;
- protected transient Object[][] aggregationParameterObjects;
+ private transient ExprNodeEvaluator[][] aggregationParameterFields;
+ private transient ObjectInspector[][] aggregationParameterObjectInspectors;
+ private transient ObjectInspector[][] aggregationParameterStandardObjectInspectors;
+ private transient Object[][] aggregationParameterObjects;
+
// so aggregationIsDistinct is a boolean array instead of a single number.
- protected transient boolean[] aggregationIsDistinct;
+ private transient boolean[] aggregationIsDistinct;
// Map from integer tag to distinct aggrs
- transient protected Map<Integer, Set<Integer>> distinctKeyAggrs =
+ private transient Map<Integer, Set<Integer>> distinctKeyAggrs =
new HashMap<Integer, Set<Integer>>();
// Map from integer tag to non-distinct aggrs with key parameters.
- transient protected Map<Integer, Set<Integer>> nonDistinctKeyAggrs =
+ private transient Map<Integer, Set<Integer>> nonDistinctKeyAggrs =
new HashMap<Integer, Set<Integer>>();
// List of non-distinct aggrs.
- transient protected List<Integer> nonDistinctAggrs = new ArrayList<Integer>();
+ private transient List<Integer> nonDistinctAggrs = new ArrayList<Integer>();
// Union expr for distinct keys
- transient ExprNodeEvaluator unionExprEval = null;
+ private transient ExprNodeEvaluator unionExprEval;
- transient GenericUDAFEvaluator[] aggregationEvaluators;
- transient boolean[] estimableAggregationEvaluators;
-
- protected transient ArrayList<ObjectInspector> objectInspectors;
- transient ArrayList<String> fieldNames;
+ private transient GenericUDAFEvaluator[] aggregationEvaluators;
+ private transient boolean[] estimableAggregationEvaluators;
// Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2,
// MERGEPARTIAL
- protected transient KeyWrapper currentKeys;
- protected transient KeyWrapper newKeys;
- protected transient AggregationBuffer[] aggregations;
- protected transient Object[][] aggregationsParametersLastInvoke;
+ private transient KeyWrapper currentKeys;
+ private transient KeyWrapper newKeys;
+ private transient AggregationBuffer[] aggregations;
+ private transient Object[][] aggregationsParametersLastInvoke;
// Used by hash-based GroupBy: Mode = HASH, PARTIALS
- protected transient HashMap<KeyWrapper, AggregationBuffer[]> hashAggregations;
+ private transient HashMap<KeyWrapper, AggregationBuffer[]> hashAggregations;
// Used by hash distinct aggregations when hashGrpKeyNotRedKey is true
- protected transient HashSet<KeyWrapper> keysCurrentGroup;
+ private transient HashSet<KeyWrapper> keysCurrentGroup;
- transient boolean firstRow;
- transient long totalMemory;
- protected transient boolean hashAggr;
+ private transient boolean firstRow;
+ private transient boolean hashAggr;
// The reduction is happening on the reducer, and the grouping key and
// reduction keys are different.
// For example: select a, count(distinct b) from T group by a
// The data is sprayed by 'b' and the reducer is grouping it by 'a'
- transient boolean groupKeyIsNotReduceKey;
- transient boolean firstRowInGroup;
- transient long numRowsInput;
- transient long numRowsHashTbl;
- transient int groupbyMapAggrInterval;
- transient long numRowsCompareHashAggr;
- transient float minReductionHashAggr;
+ private transient boolean groupKeyIsNotReduceKey;
+ private transient boolean firstRowInGroup;
+ private transient long numRowsInput;
+ private transient long numRowsHashTbl;
+ private transient int groupbyMapAggrInterval;
+ private transient long numRowsCompareHashAggr;
+ private transient float minReductionHashAggr;
- // current Key ObjectInspectors are standard ObjectInspectors
- protected transient ObjectInspector[] currentKeyObjectInspectors;
- // new Key ObjectInspectors are objectInspectors from the parent
- transient StructObjectInspector newKeyObjectInspector;
- transient StructObjectInspector currentKeyObjectInspector;
- public static MemoryMXBean memoryMXBean;
+ private transient int outputKeyLength;
- /**
- * Total amount of memory allowed for JVM heap.
- */
- protected long maxMemory;
+ // current Key ObjectInspectors are standard ObjectInspectors
+ private transient ObjectInspector[] currentKeyObjectInspectors;
- /**
- * configure percent of memory threshold usable by QP.
- */
- protected float memoryThreshold;
+ private transient MemoryMXBean memoryMXBean;
- private boolean groupingSetsPresent;
- private int groupingSetsPosition;
- private List<Integer> groupingSets;
- private List<FastBitSet> groupingSetsBitSet;
- transient private List<Object> newKeysGroupingSets;
+ private transient boolean groupingSetsPresent; // generates grouping set
+ private transient int groupingSetsPosition; // position of grouping set, generally the last of keys
+ private transient List<Integer> groupingSets; // declared grouping set values
+ private transient FastBitSet[] groupingSetsBitSet; // bitsets acquired from grouping set values
+ private transient Text[] newKeysGroupingSets;
// for these positions, some variable primitive type (String) is used, so size
// cannot be estimated. sample it at runtime.
- transient List<Integer> keyPositionsSize;
+ private transient List<Integer> keyPositionsSize;
// for these positions, some variable primitive type (String) is used for the
// aggregation classes
- transient List<Field>[] aggrPositions;
+ private transient List<Field>[] aggrPositions;
+
+ private transient int fixedRowSize;
+
+ private transient int totalVariableSize;
+ private transient int numEntriesVarSize;
+
+ private transient int countAfterReport; // report or forward
+ private transient int heartbeatInterval;
- transient int fixedRowSize;
+ /**
+ * Total amount of memory allowed for JVM heap.
+ */
+ protected transient long maxMemory;
/**
* Max memory usable by the hashtable before it should flush.
*/
protected transient long maxHashTblMemory;
- transient int totalVariableSize;
- transient int numEntriesVarSize;
+
+ /**
+ * configure percent of memory threshold usable by QP.
+ */
+ protected transient float memoryThreshold;
/**
* Current number of entries in the hash table.
*/
protected transient int numEntriesHashTable;
- transient int countAfterReport; // report or forward
- transient int heartbeatInterval;
public static FastBitSet groupingSet2BitSet(int value) {
FastBitSet bits = new FastBitSet();
@@ -197,7 +188,6 @@ public class GroupByOperator extends Ope
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
- totalMemory = Runtime.getRuntime().totalMemory();
numRowsInput = 0;
numRowsHashTbl = 0;
@@ -226,16 +216,15 @@ public class GroupByOperator extends Ope
if (groupingSetsPresent) {
groupingSets = conf.getListGroupingSets();
groupingSetsPosition = conf.getGroupingSetPosition();
- newKeysGroupingSets = new ArrayList<Object>();
- groupingSetsBitSet = new ArrayList<FastBitSet>();
+ newKeysGroupingSets = new Text[groupingSets.size()];
+ groupingSetsBitSet = new FastBitSet[groupingSets.size()];
+ int pos = 0;
for (Integer groupingSet: groupingSets) {
// Create the mapping corresponding to the grouping set
- ExprNodeEvaluator groupingSetValueEvaluator =
- ExprNodeEvaluatorFactory.get(new ExprNodeConstantDesc(String.valueOf(groupingSet)));
-
- newKeysGroupingSets.add(groupingSetValueEvaluator.evaluate(null));
- groupingSetsBitSet.add(groupingSet2BitSet(groupingSet));
+ newKeysGroupingSets[pos] = new Text(String.valueOf(groupingSet));
+ groupingSetsBitSet[pos] = groupingSet2BitSet(groupingSet);
+ pos++;
}
}
@@ -348,23 +337,12 @@ public class GroupByOperator extends Ope
aggregationEvaluators[i] = agg.getGenericUDAFEvaluator();
}
- // init objectInspectors
- int totalFields = keyFields.length + aggregationEvaluators.length;
- objectInspectors = new ArrayList<ObjectInspector>(totalFields);
- for (ExprNodeEvaluator keyField : keyFields) {
- objectInspectors.add(null);
- }
MapredContext context = MapredContext.get();
if (context != null) {
for (GenericUDAFEvaluator genericUDAFEvaluator : aggregationEvaluators) {
context.setup(genericUDAFEvaluator);
}
}
- for (int i = 0; i < aggregationEvaluators.length; i++) {
- ObjectInspector roi = aggregationEvaluators[i].init(conf.getAggregators()
- .get(i).getMode(), aggregationParameterObjectInspectors[i]);
- objectInspectors.add(roi);
- }
aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][];
if ((conf.getMode() != GroupByDesc.Mode.HASH || conf.getBucketGroup()) &&
@@ -390,26 +368,25 @@ public class GroupByOperator extends Ope
}
}
- fieldNames = conf.getOutputColumnNames();
+ List<String> fieldNames = new ArrayList<String>(conf.getOutputColumnNames());
- for (int i = 0; i < keyFields.length; i++) {
- objectInspectors.set(i, currentKeyObjectInspectors[i]);
- }
+ // grouping id should be pruned, which is the last of key columns
+ // see ColumnPrunerGroupByProc
+ outputKeyLength = conf.pruneGroupingSetId() ? keyFields.length - 1 : keyFields.length;
- // Generate key names
- ArrayList<String> keyNames = new ArrayList<String>(keyFields.length);
- for (int i = 0; i < keyFields.length; i++) {
- keyNames.add(fieldNames.get(i));
- }
- newKeyObjectInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(keyNames, Arrays
- .asList(keyObjectInspectors));
- currentKeyObjectInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(keyNames, Arrays
- .asList(currentKeyObjectInspectors));
+ // init objectInspectors
+ ObjectInspector[] objectInspectors =
+ new ObjectInspector[outputKeyLength + aggregationEvaluators.length];
+ for (int i = 0; i < outputKeyLength; i++) {
+ objectInspectors[i] = currentKeyObjectInspectors[i];
+ }
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ objectInspectors[outputKeyLength + i] = aggregationEvaluators[i].init(conf.getAggregators()
+ .get(i).getMode(), aggregationParameterObjectInspectors[i]);
+ }
outputObjInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(fieldNames, objectInspectors);
+ .getStandardStructObjectInspector(fieldNames, Arrays.asList(objectInspectors));
KeyWrapperFactory keyWrapperFactory =
new KeyWrapperFactory(keyFields, keyObjectInspectors, currentKeyObjectInspectors);
@@ -769,7 +746,7 @@ public class GroupByOperator extends Ope
flushHashTable(true);
hashAggr = false;
} else {
- if (isTraceEnabled) {
+ if (isLogTraceEnabled) {
LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl
+ " #total = " + numRowsInput + " reduction = " + 1.0
* (numRowsHashTbl / numRowsInput) + " minReduction = "
@@ -795,14 +772,14 @@ public class GroupByOperator extends Ope
newKeysArray[keyPos] = null;
}
- FastBitSet bitset = groupingSetsBitSet.get(groupingSetPos);
+ FastBitSet bitset = groupingSetsBitSet[groupingSetPos];
// Some keys need to be left to null corresponding to that grouping set.
for (int keyPos = bitset.nextSetBit(0); keyPos >= 0;
keyPos = bitset.nextSetBit(keyPos+1)) {
newKeysArray[keyPos] = cloneNewKeysArray[keyPos];
}
- newKeysArray[groupingSetsPosition] = newKeysGroupingSets.get(groupingSetPos);
+ newKeysArray[groupingSetsPosition] = newKeysGroupingSets[groupingSetPos];
processKey(row, rowInspector);
}
} else {
@@ -972,7 +949,7 @@ public class GroupByOperator extends Ope
// Update the number of entries that can fit in the hash table
numEntriesHashTable =
(int) (maxHashTblMemory / (fixedRowSize + (totalVariableSize / numEntriesVarSize)));
- if (isTraceEnabled) {
+ if (isLogTraceEnabled) {
LOG.trace("Hash Aggr: #hash table = " + numEntries
+ " #max in hash table = " + numEntriesHashTable);
}
@@ -1054,19 +1031,17 @@ public class GroupByOperator extends Ope
* The keys in the record
* @throws HiveException
*/
- protected void forward(Object[] keys,
- AggregationBuffer[] aggs) throws HiveException {
+ private void forward(Object[] keys, AggregationBuffer[] aggs) throws HiveException {
- int totalFields = keys.length + aggs.length;
if (forwardCache == null) {
- forwardCache = new Object[totalFields];
+ forwardCache = new Object[outputKeyLength + aggs.length];
}
- for (int i = 0; i < keys.length; i++) {
+ for (int i = 0; i < outputKeyLength; i++) {
forwardCache[i] = keys[i];
}
for (int i = 0; i < aggs.length; i++) {
- forwardCache[keys.length + i] = aggregationEvaluators[i].evaluate(aggs[i]);
+ forwardCache[outputKeyLength + i] = aggregationEvaluators[i].evaluate(aggs[i]);
}
forward(forwardCache, outputObjInspector);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Fri Jan 23 19:59:11 2015
@@ -91,19 +91,16 @@ public class HashTableSinkOperator exten
private transient List<ObjectInspector>[] joinFilterObjectInspectors;
private transient Byte[] order; // order in which the results should
- private Configuration hconf;
+ protected Configuration hconf;
- private transient MapJoinPersistableTableContainer[] mapJoinTables;
- private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
+ protected transient MapJoinPersistableTableContainer[] mapJoinTables;
+ protected transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
+
+ private final Object[] emptyObjectArray = new Object[0];
+ private final MapJoinEagerRowContainer emptyRowContainer = new MapJoinEagerRowContainer();
- private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
- private static final MapJoinEagerRowContainer EMPTY_ROW_CONTAINER = new MapJoinEagerRowContainer();
- static {
- EMPTY_ROW_CONTAINER.addRow(EMPTY_OBJECT_ARRAY);
- }
-
private long rowNumber = 0;
- private transient LogHelper console;
+ protected transient LogHelper console;
private long hashTableScale;
private MapJoinMemoryExhaustionHandler memoryExhaustionHandler;
@@ -121,6 +118,7 @@ public class HashTableSinkOperator exten
boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT);
console = new LogHelper(LOG, isSilent);
memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage());
+ emptyRowContainer.addRow(emptyObjectArray);
// for small tables only; so get the big table position first
posBigTableAlias = conf.getPosBigTable();
@@ -231,7 +229,7 @@ public class HashTableSinkOperator exten
MapJoinKeyObject key = new MapJoinKeyObject();
key.readFromRow(currentKey, joinKeysObjectInspectors[alias]);
- Object[] value = EMPTY_OBJECT_ARRAY;
+ Object[] value = emptyObjectArray;
if((hasFilter(alias) && filterMaps[alias].length > 0) || joinValues[alias].size() > 0) {
value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias],
@@ -244,14 +242,14 @@ public class HashTableSinkOperator exten
rowContainer = new MapJoinEagerRowContainer();
rowContainer.addRow(value);
} else {
- rowContainer = EMPTY_ROW_CONTAINER;
+ rowContainer = emptyRowContainer;
}
rowNumber++;
if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber);
}
tableContainer.put(key, rowContainer);
- } else if (rowContainer == EMPTY_ROW_CONTAINER) {
+ } else if (rowContainer == emptyRowContainer) {
rowContainer = rowContainer.copy();
rowContainer.addRow(value);
tableContainer.put(key, rowContainer);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Fri Jan 23 19:59:11 2015
@@ -583,15 +583,15 @@ public class MapOperator extends Operato
}
}
else if(vc.equals(VirtualColumn.ROWID)) {
- if(ctx.getIoCxt().ri == null) {
+ if(ctx.getIoCxt().getRecordIdentifier() == null) {
vcValues[i] = null;
}
else {
if(vcValues[i] == null) {
vcValues[i] = new Object[RecordIdentifier.Field.values().length];
}
- RecordIdentifier.StructInfo.toArray(ctx.getIoCxt().ri, (Object[])vcValues[i]);
- ctx.getIoCxt().ri = null;//so we don't accidentally cache the value; shouldn't
+ RecordIdentifier.StructInfo.toArray(ctx.getIoCxt().getRecordIdentifier(), (Object[])vcValues[i]);
+ ctx.getIoCxt().setRecordIdentifier(null);//so we don't accidentally cache the value; shouldn't
//happen since IO layer either knows how to produce ROW__ID or not - but to be safe
}
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Fri Jan 23 19:59:11 2015
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -58,6 +59,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
@@ -99,7 +102,7 @@ public class MoveTask extends Task<MoveW
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) {
deletePath = createTargetPath(targetPath, fs);
}
- if (!Hive.renameFile(conf, sourcePath, targetPath, fs, true, false)) {
+ if (!Hive.moveFile(conf, sourcePath, targetPath, fs, true, false)) {
try {
if (deletePath != null) {
fs.delete(deletePath, true);
@@ -145,7 +148,7 @@ public class MoveTask extends Task<MoveW
private Path createTargetPath(Path targetPath, FileSystem fs) throws IOException {
Path deletePath = null;
Path mkDirPath = targetPath.getParent();
- if (mkDirPath != null & !fs.exists(mkDirPath)) {
+ if (mkDirPath != null && !fs.exists(mkDirPath)) {
Path actualPath = mkDirPath;
// targetPath path is /x/y/z/1/2/3 here /x/y/z is present in the file system
// create the structure till /x/y/z/1/2 to work rename for multilevel directory
@@ -158,8 +161,14 @@ public class MoveTask extends Task<MoveW
actualPath = actualPath.getParent();
}
fs.mkdirs(mkDirPath);
+ HadoopShims shims = ShimLoader.getHadoopShims();
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS)) {
- fs.setPermission(mkDirPath, fs.getFileStatus(actualPath).getPermission());
+ try {
+ HadoopShims.HdfsFileStatus status = shims.getFullFileStatus(conf, fs, actualPath);
+ shims.setFullFileStatus(conf, status, fs, actualPath);
+ } catch (Exception e) {
+ LOG.warn("Error setting permissions or group of " + actualPath, e);
+ }
}
}
return deletePath;
@@ -259,7 +268,7 @@ public class MoveTask extends Task<MoveW
dirs = srcFs.globStatus(tbd.getSourcePath());
files = new ArrayList<FileStatus>();
for (int i = 0; (dirs != null && i < dirs.length); i++) {
- files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath())));
+ files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER)));
// We only check one file, so exit the loop when we have at least
// one.
if (files.size() > 0) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Fri Jan 23 19:59:11 2015
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec;
import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
@@ -59,6 +61,7 @@ import org.apache.hadoop.hive.ql.plan.Re
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
import org.apache.hadoop.hive.ql.plan.ScriptDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.UDTFDesc;
import org.apache.hadoop.hive.ql.plan.UnionDesc;
@@ -103,6 +106,8 @@ public final class OperatorFactory {
HashTableDummyOperator.class));
opvec.add(new OpTuple<HashTableSinkDesc>(HashTableSinkDesc.class,
HashTableSinkOperator.class));
+ opvec.add(new OpTuple<SparkHashTableSinkDesc>(SparkHashTableSinkDesc.class,
+ SparkHashTableSinkOperator.class));
opvec.add(new OpTuple<DummyStoreDesc>(DummyStoreDesc.class,
DummyStoreOperator.class));
opvec.add(new OpTuple<DemuxDesc>(DemuxDesc.class,
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Fri Jan 23 19:59:11 2015
@@ -53,6 +53,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
+import org.apache.spark.SparkFiles;
/**
* ScriptOperator.
@@ -214,6 +215,7 @@ public class ScriptOperator extends Oper
if (pathenv == null || pathSep == null || fileSep == null) {
return null;
}
+
int val = -1;
String classvalue = pathenv + pathSep;
@@ -332,6 +334,11 @@ public class ScriptOperator extends Oper
if (!new File(prog).isAbsolute()) {
PathFinder finder = new PathFinder("PATH");
finder.prependPathComponent(currentDir.toString());
+
+ // In spark local mode, we need to search added files in root directory.
+ if (HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ finder.prependPathComponent(SparkFiles.getRootDirectory());
+ }
File f = finder.getAbsolutePath(prog);
if (f != null) {
cmdArgs[0] = f.getAbsolutePath();
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Fri Jan 23 19:59:11 2015
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
/**
* Select operator implementation.
@@ -55,12 +54,12 @@ public class SelectOperator extends Oper
for (int i = 0; i < colList.size(); i++) {
assert (colList.get(i) != null);
eval[i] = ExprNodeEvaluatorFactory.get(colList.get(i));
- if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEEXPREVALUATIONCACHE)) {
- eval[i] = ExprNodeEvaluatorFactory.toCachedEval(eval[i]);
- }
+ }
+ if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEEXPREVALUATIONCACHE)) {
+ eval = ExprNodeEvaluatorFactory.toCachedEvals(eval);
}
output = new Object[eval.length];
- LOG.info("SELECT " + ((StructObjectInspector) inputObjInspectors[0]).getTypeName());
+ LOG.info("SELECT " + inputObjInspectors[0].getTypeName());
outputObjInspector = initEvaluatorsAndReturnStruct(eval, conf.getOutputColumnNames(),
inputObjInspectors[0]);
initializeChildren(hconf);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Fri Jan 23 19:59:11 2015
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.Fu
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
@@ -103,6 +105,7 @@ public final class TaskFactory {
taskvec.add(new TaskTuple<IndexMetadataChangeWork>(IndexMetadataChangeWork.class,
IndexMetadataChangeTask.class));
taskvec.add(new TaskTuple<TezWork>(TezWork.class, TezTask.class));
+ taskvec.add(new TaskTuple<SparkWork>(SparkWork.class, SparkTask.class));
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Jan 23 19:59:11 2015
@@ -94,6 +94,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.HiveInterruptCallback;
import org.apache.hadoop.hive.common.HiveInterruptUtils;
import org.apache.hadoop.hive.common.HiveStatsUtils;
@@ -112,6 +113,7 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
@@ -360,17 +362,21 @@ public final class Utilities {
InputStream in = null;
try {
path = getPlanPath(conf, name);
+ LOG.info("PLAN PATH = " + path);
assert path != null;
- if (!gWorkMap.containsKey(path)) {
+ if (!gWorkMap.containsKey(path)
+ || HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
Path localPath;
if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) {
localPath = new Path(name);
} else if (ShimLoader.getHadoopShims().isLocalMode(conf)) {
localPath = path;
} else {
+ LOG.info("***************non-local mode***************");
localPath = new Path(name);
}
-
+ localPath = path;
+ LOG.info("local path = " + localPath);
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) {
LOG.debug("Loading plan from string: "+path.toUri().getPath());
String planString = conf.get(path.toUri().getPath());
@@ -382,7 +388,8 @@ public final class Utilities {
in = new ByteArrayInputStream(planBytes);
in = new InflaterInputStream(in);
} else {
- in = new FileInputStream(localPath.toUri().getPath());
+ LOG.info("Open file to read in plan: " + localPath);
+ in = localPath.getFileSystem(conf).open(localPath);
}
if(MAP_PLAN_NAME.equals(name)){
@@ -416,6 +423,7 @@ public final class Utilities {
return gWork;
} catch (FileNotFoundException fnf) {
// happens. e.g.: no reduce work.
+ LOG.info("File not found: " + fnf.getMessage());
LOG.info("No plan file found: "+path);
return null;
} catch (Exception e) {
@@ -967,6 +975,23 @@ public final class Utilities {
}
/**
+ * Clones using the powers of XML. Do not use unless necessary.
+ * @param plan The plan.
+ * @return The clone.
+ */
+ public static BaseWork cloneBaseWork(BaseWork plan) {
+ PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ Configuration conf = new HiveConf();
+ serializePlan(plan, baos, conf, true);
+ BaseWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+ plan.getClass(), conf, true);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN);
+ return newPlan;
+ }
+
+ /**
* Serialize the object. This helper function mainly makes sure that enums,
* counters, etc are handled properly.
*/
@@ -1042,7 +1067,6 @@ public final class Utilities {
removeField(kryo, Operator.class, "colExprMap");
removeField(kryo, ColumnInfo.class, "objectInspector");
removeField(kryo, MapWork.class, "opParseCtxMap");
- removeField(kryo, MapWork.class, "joinTree");
return kryo;
};
};
@@ -1777,7 +1801,7 @@ public final class Utilities {
*/
public static FileStatus[] listStatusIfExists(Path path, FileSystem fs) throws IOException {
try {
- return fs.listStatus(path);
+ return fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
} catch (FileNotFoundException e) {
// FS in hadoop 2.0 throws FNF instead of returning null
return null;
@@ -2613,7 +2637,7 @@ public final class Utilities {
FileSystem inpFs = dirPath.getFileSystem(job);
if (inpFs.exists(dirPath)) {
- FileStatus[] fStats = inpFs.listStatus(dirPath);
+ FileStatus[] fStats = inpFs.listStatus(dirPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
if (fStats.length > 0) {
return false;
}
@@ -2640,6 +2664,27 @@ public final class Utilities {
}
}
}
+
+ public static List<SparkTask> getSparkTasks(List<Task<? extends Serializable>> tasks) {
+ List<SparkTask> sparkTasks = new ArrayList<SparkTask>();
+ if (tasks != null) {
+ getSparkTasks(tasks, sparkTasks);
+ }
+ return sparkTasks;
+ }
+
+ private static void getSparkTasks(List<Task<? extends Serializable>> tasks,
+ List<SparkTask> sparkTasks) {
+ for (Task<? extends Serializable> task : tasks) {
+ if (task instanceof SparkTask && !sparkTasks.contains(task)) {
+ sparkTasks.add((SparkTask) task);
+ }
+
+ if (task.getDependentTasks() != null) {
+ getSparkTasks(task.getDependentTasks(), sparkTasks);
+ }
+ }
+ }
public static List<ExecDriver> getMRTasks(List<Task<? extends Serializable>> tasks) {
List<ExecDriver> mrTasks = new ArrayList<ExecDriver>();
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Fri Jan 23 19:59:11 2015
@@ -242,7 +242,7 @@ public class ExecDriver extends Task<Map
job.setPartitionerClass((Class<? extends Partitioner>) (Class.forName(HiveConf.getVar(job,
HiveConf.ConfVars.HIVEPARTITIONER))));
} catch (ClassNotFoundException e) {
- throw new RuntimeException(e.getMessage());
+ throw new RuntimeException(e.getMessage(), e);
}
if (mWork.getNumMapTasks() != null) {
@@ -288,7 +288,7 @@ public class ExecDriver extends Task<Map
try {
job.setInputFormat((Class<? extends InputFormat>) (Class.forName(inpFormat)));
} catch (ClassNotFoundException e) {
- throw new RuntimeException(e.getMessage());
+ throw new RuntimeException(e.getMessage(), e);
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java Fri Jan 23 19:59:11 2015
@@ -63,7 +63,7 @@ public class ExecMapperContext {
public ExecMapperContext(JobConf jc) {
this.jc = jc;
- ioCxt = IOContext.get(jc.get(Utilities.INPUT_NAME));
+ ioCxt = IOContext.get(jc);
}
public void clear() {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Fri Jan 23 19:59:11 2015
@@ -58,8 +58,6 @@ public class HashMapWrapper extends Abst
private static final float LOADFACTOR = 0.75f;
private final HashMap<MapJoinKey, MapJoinRowContainer> mHash; // main memory HashMap
private MapJoinKey lastKey = null;
- private final boolean useLazyRows;
- private final boolean useOptimizedKeys;
private Output output = new Output(0); // Reusable output for serialization
public HashMapWrapper(Map<String, String> metaData) {
@@ -67,30 +65,24 @@ public class HashMapWrapper extends Abst
int threshold = Integer.parseInt(metaData.get(THESHOLD_NAME));
float loadFactor = Float.parseFloat(metaData.get(LOAD_NAME));
mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
- useLazyRows = useOptimizedKeys = false;
}
public HashMapWrapper() {
this(HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT.defaultFloatVal,
HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD.defaultIntVal,
- HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR.defaultFloatVal, false, false, -1);
+ HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR.defaultFloatVal, -1);
}
public HashMapWrapper(Configuration hconf, long keyCount) {
this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT),
HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD),
- HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR),
- HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINLAZYHASHTABLE),
- HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDKEYS), keyCount);
+ HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), keyCount);
}
- private HashMapWrapper(float keyCountAdj, int threshold, float loadFactor,
- boolean useLazyRows, boolean useOptimizedKeys, long keyCount) {
+ private HashMapWrapper(float keyCountAdj, int threshold, float loadFactor, long keyCount) {
super(createConstructorMetaData(threshold, loadFactor));
threshold = calculateTableSize(keyCountAdj, threshold, loadFactor, keyCount);
mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
- this.useLazyRows = useLazyRows;
- this.useOptimizedKeys = useOptimizedKeys;
}
public static int calculateTableSize(
@@ -131,21 +123,14 @@ public class HashMapWrapper extends Abst
public MapJoinKey putRow(MapJoinObjectSerDeContext keyContext, Writable currentKey,
MapJoinObjectSerDeContext valueContext, Writable currentValue)
throws SerDeException, HiveException {
- // We pass key in as reference, to find out quickly if optimized keys can be used.
- // However, we do not reuse the object since we are putting them into the hashmap.
- // Later, we don't create optimized keys in MapJoin if hash map doesn't have optimized keys.
- if (lastKey == null && !useOptimizedKeys) {
- lastKey = new MapJoinKeyObject();
- }
-
- lastKey = MapJoinKey.read(output, lastKey, keyContext, currentKey, false);
- LazyFlatRowContainer values = (LazyFlatRowContainer)get(lastKey);
+ MapJoinKey key = MapJoinKey.read(output, keyContext, currentKey);
+ FlatRowContainer values = (FlatRowContainer)get(key);
if (values == null) {
- values = new LazyFlatRowContainer();
- put(lastKey, values);
+ values = new FlatRowContainer();
+ put(key, values);
}
- values.add(valueContext, (BytesWritable)currentValue, useLazyRows);
- return lastKey;
+ values.add(valueContext, (BytesWritable)currentValue);
+ return key;
}
@Override
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java Fri Jan 23 19:59:11 2015
@@ -115,7 +115,6 @@ public class MapJoinEagerRowContainer
public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
throws IOException, SerDeException {
- clearRows();
long numRows = in.readLong();
for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
container.readFields(in);