You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/05 19:31:16 UTC
hive git commit: HIVE-13620: Merge llap branch work to master
(committing changes from review feedback)
Repository: hive
Updated Branches:
refs/heads/llap e05790973 -> 2a03f1f46
HIVE-13620: Merge llap branch work to master (committing changes from review feedback)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2a03f1f4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2a03f1f4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2a03f1f4
Branch: refs/heads/llap
Commit: 2a03f1f4648c683414c0b23be0aebbfd614d105c
Parents: e057909
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu May 5 12:29:14 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu May 5 12:29:14 2016 -0700
----------------------------------------------------------------------
.../hive/llap/ext/TestLlapInputSplit.java | 18 ++
.../apache/hive/jdbc/TestJdbcWithMiniLlap.java | 37 +----
.../org/apache/hadoop/hive/ql/QTestUtil.java | 3 +-
.../hadoop/hive/llap/LlapBaseRecordReader.java | 38 +++--
.../hadoop/hive/llap/LlapRowRecordReader.java | 48 ++++--
.../apache/hadoop/hive/llap/SubmitWorkInfo.java | 16 ++
.../ext/LlapTaskUmbilicalExternalClient.java | 46 +++---
.../helpers/LlapTaskUmbilicalServer.java | 16 ++
.../hadoop/hive/llap/LlapBaseInputFormat.java | 20 +--
.../org/apache/hadoop/hive/llap/LlapDump.java | 11 +-
.../hadoop/hive/llap/LlapRowInputFormat.java | 18 ++
.../hive/llap/daemon/impl/LlapDaemon.java | 2 +-
.../llap/daemon/impl/TaskRunnerCallable.java | 6 +-
.../daemon/impl/TaskExecutorTestHelpers.java | 2 +-
.../hadoop/hive/llap/LlapDataOutputBuffer.java | 165 -------------------
.../hive/llap/LlapOutputFormatService.java | 27 +--
.../hive/ql/exec/tez/HiveSplitGenerator.java | 4 +-
.../hive/ql/io/HivePassThroughRecordWriter.java | 4 -
.../hive/ql/parse/TypeCheckProcFactory.java | 9 +-
.../ql/udf/generic/GenericUDTFGetSplits.java | 1 -
.../org/apache/tez/dag/api/TaskSpecBuilder.java | 17 +-
.../hadoop/hive/llap/TestLlapOutputFormat.java | 2 +-
.../results/clientpositive/show_functions.q.out | 1 +
23 files changed, 209 insertions(+), 302 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
index 8264190..1de8aa6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.llap.ext;
import java.io.ByteArrayInputStream;
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
index 5b4ba49..48b9493 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
@@ -161,39 +161,7 @@ public class TestJdbcWithMiniLlap {
stmt.close();
}
- private static boolean timedOut = false;
-
- private static class TestTimerTask extends TimerTask {
- private boolean timedOut = false;
- private Thread threadToInterrupt;
-
- public TestTimerTask(Thread threadToInterrupt) {
- this.threadToInterrupt = threadToInterrupt;
- }
-
- @Override
- public void run() {
- System.out.println("Test timed out!");
- timedOut = true;
- threadToInterrupt.interrupt();
- }
-
- public boolean isTimedOut() {
- return timedOut;
- }
-
- public void setTimedOut(boolean timedOut) {
- this.timedOut = timedOut;
- }
-
- }
-
private int getLlapIFRowCount(String query, int numSplits) throws Exception {
- // Add a timer task to stop this test if it has not finished in a reasonable amount of time.
- Timer timer = new Timer();
- long delay = 30000;
- TestTimerTask timerTask = new TestTimerTask(Thread.currentThread());
- timer.schedule(timerTask, delay);
// Setup LlapInputFormat
String url = miniHS2.getJdbcURL();
@@ -245,13 +213,10 @@ public class TestJdbcWithMiniLlap {
}
}
- timer.cancel();
- assertFalse("Test timed out", timerTask.isTimedOut());
-
return rowCount;
}
- @Test
+ @Test(timeout = 60000)
public void testLlapInputFormatEndToEnd() throws Exception {
createTestTable("testtab1");
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 3f8c3c5..a6e8efa 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -1468,8 +1468,7 @@ public class QTestUtil {
".*Input:.*/data/files/.*",
".*Output:.*/data/files/.*",
".*total number of created files now is.*",
- ".*.hive-staging.*",
- "table_.*"
+ ".*.hive-staging.*"
});
private final Pattern[] partialReservedPlanMask = toPattern(new String[] {
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
index 0cd9672..3c858a8 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -23,8 +23,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.DataInputStream;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.Schema;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -37,22 +39,27 @@ import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Base LLAP RecordReader to handle receiving of the data from the LLAP daemon.
+ */
public class LlapBaseRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> {
private static final Logger LOG = LoggerFactory.getLogger(LlapBaseRecordReader.class);
- DataInputStream din;
- Schema schema;
- Class<V> clazz;
-
+ protected final DataInputStream din;
+ protected final Schema schema;
+ protected final Class<V> clazz;
protected Thread readerThread = null;
- protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
+ protected final LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
+ protected final long timeout;
- public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz) {
+ public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz, JobConf job) {
din = new DataInputStream(in);
this.schema = schema;
this.clazz = clazz;
this.readerThread = Thread.currentThread();
+ this.timeout = 3 * HiveConf.getTimeVar(job,
+ HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
public Schema getSchema() {
@@ -65,10 +72,16 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
}
@Override
- public long getPos() { return 0; }
+ public long getPos() {
+ // dummy impl
+ return 0;
+ }
@Override
- public float getProgress() { return 0f; }
+ public float getProgress() {
+ // dummy impl
+ return 0f;
+ }
@Override
public NullWritable createKey() {
@@ -106,7 +119,7 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
} catch (IOException io) {
if (Thread.interrupted()) {
// Either we were interrupted by one of:
- // 1. handleEvent(), in which case there is a reader event waiting for us in the queue
+ // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue
// 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
// Either way we should not try to block trying to read the reader events queue.
if (readerEvents.isEmpty()) {
@@ -186,9 +199,12 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
}
}
- protected ReaderEvent getReaderEvent() {
+ protected ReaderEvent getReaderEvent() throws IOException {
try {
- ReaderEvent event = readerEvents.take();
+ ReaderEvent event = readerEvents.poll(timeout, TimeUnit.MILLISECONDS);
+ if (event == null) {
+ throw new IOException("Timed out getting readerEvents");
+ }
return event;
} catch (InterruptedException ie) {
throw new RuntimeException("Interrupted while getting readerEvents, not expected: " + ie.getMessage(), ie);
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
index 4e000ff..084da0a 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.llap;
import com.google.common.base.Preconditions;
@@ -32,21 +50,29 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+/**
+ * Row-based record reader for LLAP.
+ */
public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class);
- Configuration conf;
- RecordReader<NullWritable, Text> reader;
- Schema schema;
- SerDe serde;
- final Text textData = new Text();
+ protected final Configuration conf;
+ protected final RecordReader<NullWritable, Text> reader;
+ protected final Schema schema;
+ protected final SerDe serde;
+ protected final Text textData = new Text();
- public LlapRowRecordReader(Configuration conf, Schema schema, RecordReader<NullWritable, Text> reader) {
+ public LlapRowRecordReader(Configuration conf, Schema schema, RecordReader<NullWritable, Text> reader) throws IOException {
this.conf = conf;
this.schema = schema;
this.reader = reader;
+
+ try {
+ serde = initSerDe(conf);
+ } catch (SerDeException err) {
+ throw new IOException(err);
+ }
}
@Override
@@ -78,14 +104,6 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
public boolean next(NullWritable key, Row value) throws IOException {
Preconditions.checkArgument(value != null);
- if (serde == null) {
- try {
- serde = initSerDe(conf);
- } catch (SerDeException err) {
- throw new IOException(err);
- }
- }
-
boolean hasNext = reader.next(key, textData);
if (hasNext) {
// Deserialize Text to column values, and populate the row record
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
index 83149ab..6704294 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.hive.llap;
import java.io.DataInput;
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 6e2c85d..0edb1cd 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.hive.llap.ext;
import java.io.IOException;
@@ -83,10 +99,6 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
}
}
- // TODO KKK Work out the details of the tokenIdentifier, and the session token.
- // It may just be possible to create one here - since Shuffle is not involved, and this is only used
- // for communication from LLAP-Daemons to the server. It will need to be sent in as part
- // of the job submission request.
public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier,
Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder) {
super(LlapTaskUmbilicalExternalClient.class.getName());
@@ -96,9 +108,9 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
this.sessionToken = sessionToken;
this.responder = responder;
this.timer = new ScheduledThreadPoolExecutor(1);
- this.connectionTimeout = HiveConf.getTimeVar(conf,
+ this.connectionTimeout = 3 * HiveConf.getTimeVar(conf,
HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough.
+ // No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough.
this.communicator = new LlapProtocolClientProxy(1, conf, null);
this.communicator.init(conf);
}
@@ -173,24 +185,6 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
responder.submissionFailed(fragmentId, err);
}
});
-
-
-
-
-
-// // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment.
-// // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable
-// QueryIdentifierProto queryIdentifier = QueryIdentifierProto
-// .newBuilder()
-// .setAppIdentifier(submitWorkRequestProto.getApplicationIdString()).setDagIdentifier(submitWorkRequestProto.getFragmentSpec().getDagId())
-// .build();
-// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto sourceStateUpdatedRequest =
-// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(queryIdentifier).setState(
-// LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED).
-// setSrcName(TODO)
-// communicator.sendSourceStateUpdate(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()).set);
-
-
}
private void updateHeartbeatInfo(String taskAttemptId) {
@@ -261,7 +255,6 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
LOG.info("Pending taskAttemptId " + timedOutTask + " timed out");
responder.heartbeatTimeout(timedOutTask);
pendingEvents.remove(timedOutTask);
- // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
}
timedOutTasks.clear();
@@ -277,7 +270,6 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
LOG.info("Running taskAttemptId " + timedOutTask + " timed out");
responder.heartbeatTimeout(timedOutTask);
registeredTasks.remove(timedOutTask);
- // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
}
}
}
@@ -291,7 +283,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
- // TODO Ideally, the server should be shared across all client sessions running on the same node.
+ // Ideally, the server should be shared across all client sessions running on the same node.
private class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol {
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
index dbd591a..79800da 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.hive.llap.tezplugins.helpers;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 988002f..4306c22 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -97,6 +97,9 @@ import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
+/**
+ * Base LLAP input format to handle requesting of splits and communication with LLAP daemon.
+ */
public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
private static final Logger LOG = LoggerFactory.getLogger(LlapBaseInputFormat.class);
@@ -178,7 +181,7 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
LOG.info("Registered id: " + id);
- LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
+ LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class, job);
umbilicalResponder.setRecordReader(recordReader);
return recordReader;
}
@@ -312,21 +315,6 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
Credentials taskCredentials = new Credentials();
// Credentials can change across DAGs. Ideally construct only once per DAG.
- // TODO Figure out where credentials will come from. Normally Hive sets up
- // URLs on the tez dag, for which Tez acquires credentials.
-
- // taskCredentials.addAll(getContext().getCredentials());
-
- // Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
- // taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
- // ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
- // if (credentialsBinary == null) {
- // credentialsBinary = serializeCredentials(getContext().getCredentials());
- // credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
- // } else {
- // credentialsBinary = credentialsBinary.duplicate();
- // }
- // builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
Credentials credentials = new Credentials();
TokenCache.setSessionToken(token, credentials);
ByteBuffer credentialsBinary = serializeCredentials(credentials);
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
index d485bfa..08ad1f5 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
@@ -57,6 +57,11 @@ import org.apache.hadoop.hive.llap.LlapRowRecordReader;
import org.apache.hadoop.hive.llap.Row;
import org.apache.hadoop.hive.llap.Schema;
+/**
+ * Utility to test query and data retrieval via the LLAP input format.
+ * llapdump --hiveconf hive.zookeeper.quorum=localhost --hiveconf hive.zookeeper.client.port=2181 --hiveconf hive.llap.daemon.service.hosts=@llap_MiniLlapCluster 'select * from employee where employee_id < 10'
+ *
+ */
public class LlapDump {
private static final Logger LOG = LoggerFactory.getLogger(LlapDump.class);
@@ -64,7 +69,7 @@ public class LlapDump {
private static String url = "jdbc:hive2://localhost:10000/default";
private static String user = "hive";
private static String pwd = "";
- private static String query = "select * from test";
+ private static String query = null;
private static String numSplits = "1";
public static void main(String[] args) throws Exception {
@@ -99,6 +104,10 @@ public class LlapDump {
query = cli.getArgs()[0];
}
+ if (query == null) {
+ throw new IllegalArgumentException("No query string specified");
+ }
+
System.out.println("url: "+url);
System.out.println("user: "+user);
System.out.println("query: "+query);
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
index 56ad555..7efc711 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.llap;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 8b2b978..223c390 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -343,7 +343,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
if (webServices != null) {
getConfig().setInt(ConfVars.LLAP_DAEMON_WEB_PORT.varname, webServices.getPort());
}
- LlapOutputFormatService.get();
+ getConfig().setInt(ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT.varname, LlapOutputFormatService.get().getPort());
this.registry.init(getConfig());
this.registry.start();
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 8594ee1..74359fa 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -141,8 +141,10 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
vertex, request.getFragmentNumber(), request.getAttemptNumber(), attemptId);
this.amReporter = amReporter;
// Register with the AMReporter when the callable is setup. Unregister once it starts running.
- this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
- vertex.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
+ if (amReporter != null && jobToken != null) {
+ this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
+ vertex.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
+ }
this.metrics = metrics;
this.requestId = taskSpec.getTaskAttemptID().toString();
// TODO Change this to the queryId/Name when that's available.
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index d699f20..279baf1 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -147,7 +147,7 @@ public class TaskExecutorTestHelpers {
public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo fragmentInfo,
boolean canFinish, long workTime) {
super(requestProto, fragmentInfo, new Configuration(),
- new ExecutionContextImpl("localhost"), null, new Credentials(), 0, null, null, mock(
+ new ExecutionContextImpl("localhost"), null, new Credentials(), 0, mock(AMReporter.class), null, mock(
LlapDaemonExecutorMetrics.class),
mock(KilledTaskHandler.class), mock(
FragmentCompletionHandler.class), new DefaultHadoopShim(), null);
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/ql/src/java/org/apache/hadoop/hive/llap/LlapDataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapDataOutputBuffer.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapDataOutputBuffer.java
deleted file mode 100644
index aad8968..0000000
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapDataOutputBuffer.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.llap;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-/**
- * A thread-not-safe version of Hadoop's DataOutputBuffer, which removes all
- * synchronized modifiers.
- */
-public class LlapDataOutputBuffer implements DataOutput {
-
- int readOffset;
- int writeOffset;
- byte[] buffer;
-
- /** Constructs a new empty buffer. */
- public LlapDataOutputBuffer(int length) {
- buffer = new byte[length];
- reset();
- }
-
- /**
- * Returns the current contents of the buffer. Data is only valid to
- * {@link #getLength()}.
- */
- public byte[] getData() {
- return buffer;
- }
-
- /** Returns the length of the valid data currently in the buffer. */
- public int getLength() {
- return (writeOffset - readOffset) % buffer.length;
- }
-
- /** Resets the buffer to empty. */
- public LlapDataOutputBuffer reset() {
- readOffset = 0;
- writeOffset = 0;
- return this;
- }
-
- /** Writes bytes from a DataInput directly into the buffer. */
- public void write(DataInput in, int length) throws IOException {
- //
- }
-
- @Override
- public synchronized void write(int b) throws IOException {
- while (readOffset == writeOffset) {
- try {
- wait();
- } catch(InterruptedException e) {
- }
- }
- buffer[writeOffset] = (byte)b;
- writeOffset = (writeOffset + 1) % buffer.length;
- notify();
- }
-
- public synchronized int read() throws IOException {
- while (readOffset == writeOffset) {
- try {
- wait();
- } catch(InterruptedException e) {
- }
- }
- int b = buffer[readOffset];
- readOffset = (readOffset + 1) % buffer.length;
- notify();
- return b;
- }
-
- @Override
- public void write(byte b[], int off, int len) throws IOException {
- while(len-- != 0) {
- write(b[off++]);
- }
- }
-
- @Override
- public void write(byte b[]) throws IOException {
- write(b, 0, b.length);
- }
-
-
- @Override
- public void writeBoolean(boolean v) throws IOException {
- write(v?1:0);
- }
-
- @Override
- public void writeByte(int v) throws IOException {
- write(v);
- }
-
- @Override
- public void writeChar(int v) throws IOException {
- write(v);
- }
-
- @Override
- public void writeBytes(String v) throws IOException {
- write(v.getBytes(), 0, v.length());
- }
-
- @Override
- public void writeChars(String v) throws IOException {
- write(v.getBytes(), 0, v.length());
- }
-
- @Override
- public void writeDouble(double v) throws IOException {
- write(ByteBuffer.allocate(8).putDouble(v).array(),0,8);
- }
-
- @Override
- public void writeFloat(float v) throws IOException {
- write(ByteBuffer.allocate(4).putFloat(v).array(),0,4);
- }
-
- @Override
- public void writeInt(int v) throws IOException {
- write(v);
- write(v>>>8);
- write(v>>>16);
- write(v>>>24);
- }
-
- @Override
- public void writeLong(long v) throws IOException {
- int v1 = (int)v;
- int v2 = (int)v>>>32;
- write(v1);
- write(v2);
- }
-
- @Override
- public void writeShort(int v) throws IOException {
- write(v);
- write(v>>>8);
- }
-
- @Override
- public void writeUTF(String v) throws IOException {
- write(v.getBytes(), 0, v.length());
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index b39f085..6adbf7c 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -20,6 +20,7 @@ import java.util.Map;
import java.util.HashMap;
import java.io.IOException;
import java.io.OutputStream;
+import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +61,7 @@ import io.netty.util.concurrent.Future;
/**
- *
+ * Responsible for sending back result set data to the connections made by external clients via the LLAP input format.
*/
public class LlapOutputFormatService {
@@ -75,6 +76,7 @@ public class LlapOutputFormatService {
private EventLoopGroup eventLoopGroup;
private ServerBootstrap serverBootstrap;
private ChannelFuture listeningChannelFuture;
+ private int port;
private LlapOutputFormatService() throws IOException {
writers = new HashMap<String, RecordWriter>();
@@ -92,17 +94,18 @@ public class LlapOutputFormatService {
public void start() throws IOException {
LOG.info("Starting LlapOutputFormatService");
- int port = conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
+ int portFromConf = conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
eventLoopGroup = new NioEventLoopGroup(1);
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventLoopGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new LlapOutputFormatServiceChannelHandler());
try {
- LOG.info("LlapOutputFormatService: Binding to port " + port);
- listeningChannelFuture = serverBootstrap.bind(port).sync();
+ listeningChannelFuture = serverBootstrap.bind(portFromConf).sync();
+ this.port = ((InetSocketAddress) listeningChannelFuture.channel().localAddress()).getPort();
+ LOG.info("LlapOutputFormatService: Binding to port " + this.port);
} catch (InterruptedException err) {
- throw new IOException("LlapOutputFormatService: Error binding to port " + port, err);
+ throw new IOException("LlapOutputFormatService: Error binding to port " + portFromConf, err);
}
}
@@ -132,6 +135,10 @@ public class LlapOutputFormatService {
return writer;
}
+ public int getPort() {
+ return port;
+ }
+
protected class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) {
@@ -179,11 +186,11 @@ public class LlapOutputFormatService {
protected class LlapOutputFormatServiceChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(
- new DelimiterBasedFrameDecoder(MAX_QUERY_ID_LENGTH, Delimiters.nulDelimiter()),
- new StringDecoder(),
- new StringEncoder(),
- new LlapOutputFormatServiceHandler());
+ ch.pipeline().addLast(
+ new DelimiterBasedFrameDecoder(MAX_QUERY_ID_LENGTH, Delimiters.nulDelimiter()),
+ new StringDecoder(),
+ new StringEncoder(),
+ new LlapOutputFormatServiceHandler());
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 4e6272f..54fc3af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -93,7 +93,7 @@ public class HiveSplitGenerator extends InputInitializer {
this.work = work;
this.jobConf = new JobConf(conf);
- // TODO RSHACK - assuming grouping enabled always.
+ // Assuming grouping enabled always.
userPayloadProto = MRInputUserPayloadProto.newBuilder().setGroupingEnabled(true).build();
this.splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG);
@@ -106,7 +106,7 @@ public class HiveSplitGenerator extends InputInitializer {
// must be setup and initialized here so that it sets up it's structures to start accepting events.
// Setting it up in initialize leads to a window where events may come in before the pruner is
// initialized, which may cause it to drop events.
- // TODO RSHACK - No dynamic partition pruning
+ // No dynamic partition pruning
pruner = null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
index 6d00a0a..454c321 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
@@ -23,14 +23,11 @@ import java.io.IOException;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class HivePassThroughRecordWriter <K extends WritableComparable<?>, V extends Writable>
implements RecordWriter {
- public static final Logger LOG = LoggerFactory.getLogger(HivePassThroughRecordWriter.class);
private final org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;
public HivePassThroughRecordWriter(org.apache.hadoop.mapred.RecordWriter<K, V> writer) {
@@ -45,7 +42,6 @@ implements RecordWriter {
@Override
public void close(boolean abort) throws IOException {
- LOG.info("Closing the pass through writer.");
//close with null reporter
mWriter.close(null);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
index 81320a2..19e80f7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
@@ -1277,17 +1277,14 @@ public class TypeCheckProcFactory {
try {
return getXpathOrFuncExprNodeDesc(expr, isFunction, children, ctx);
} catch (UDFArgumentTypeException e) {
- LOG.error("UDFArgumentTypeException: ", e);
throw new SemanticException(ErrorMsg.INVALID_ARGUMENT_TYPE.getMsg(expr
- .getChild(childrenBegin + e.getArgumentId()), e.getMessage()));
+ .getChild(childrenBegin + e.getArgumentId()), e.getMessage()), e);
} catch (UDFArgumentLengthException e) {
- LOG.error("UDFArgumentLengthException: ", e);
throw new SemanticException(ErrorMsg.INVALID_ARGUMENT_LENGTH.getMsg(
- expr, e.getMessage()));
+ expr, e.getMessage()), e);
} catch (UDFArgumentException e) {
- LOG.error("UDFArgumentException: ", e);
throw new SemanticException(ErrorMsg.INVALID_ARGUMENT.getMsg(expr, e
- .getMessage()));
+ .getMessage()), e);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 50cdadb..ce69ee6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -277,7 +277,6 @@ public class GenericUDTFGetSplits extends GenericUDTF {
DAG dag = DAG.create(work.getName());
dag.setCredentials(job.getCredentials());
- // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag);
DagUtils utils = DagUtils.getInstance();
Context ctx = new Context(job);
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
index 5db8c48..6d31802 100644
--- a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
+++ b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tez.dag.api;
import java.util.ArrayList;
@@ -25,7 +41,6 @@ public class TaskSpecBuilder {
List<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> outputs =
vertex.getOutputs();
- // TODO RSHACK - for now these must be of size 1.
Preconditions.checkState(inputs.size() == 1);
Preconditions.checkState(outputs.size() == 1);
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
index 37e21b8..907d5b0 100644
--- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
@@ -113,7 +113,7 @@ public class TestLlapOutputFormat {
writer.close(null);
InputStream in = socket.getInputStream();
- LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class);
+ LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class, job);
LOG.debug("Have record reader");
http://git-wip-us.apache.org/repos/asf/hive/blob/2a03f1f4/ql/src/test/results/clientpositive/show_functions.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/show_functions.q.out b/ql/src/test/results/clientpositive/show_functions.q.out
index 5c8b982..a811747 100644
--- a/ql/src/test/results/clientpositive/show_functions.q.out
+++ b/ql/src/test/results/clientpositive/show_functions.q.out
@@ -91,6 +91,7 @@ format_number
from_unixtime
from_utc_timestamp
get_json_object
+get_splits
greatest
hash
hex