You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2016/03/26 22:13:18 UTC
hive git commit: HIVE-13324. LLAP: history log for FRAGMENT_START
doesn't log DagId correctly. (Siddharth Seth, Reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master 2449d1dfe -> 3038b05ed
HIVE-13324. LLAP: history log for FRAGMENT_START doesn't log DagId correctly. (Siddharth Seth, Reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3038b05e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3038b05e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3038b05e
Branch: refs/heads/master
Commit: 3038b05ed346f4b5438e9072edb19186ea90d042
Parents: 2449d1d
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat Mar 26 14:12:36 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat Mar 26 14:12:36 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/llap/tez/Converters.java | 1 +
.../hadoop/hive/llap/tez/TestConverters.java | 190 +++++++++++++++++++
2 files changed, 191 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3038b05e/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
index a5c3631..ec6e439 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
@@ -85,6 +85,7 @@ public class Converters {
FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder();
builder.setFragmentIdentifierString(taskSpec.getTaskAttemptID().toString());
builder.setDagName(taskSpec.getDAGName());
+ builder.setDagId(taskSpec.getDagIdentifier());
builder.setVertexName(taskSpec.getVertexName());
builder.setVertexParallelism(taskSpec.getVertexParallelism());
builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
http://git-wip-us.apache.org/repos/asf/hive/blob/3038b05e/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
----------------------------------------------------------------------
diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
new file mode 100644
index 0000000..d4cdac1
--- /dev/null
+++ b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tez;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.junit.Test;
+
+public class TestConverters {
+
+ @Test(timeout = 5000)
+ public void testTaskSpecToFragmentSpec() {
+ ByteBuffer procBb = ByteBuffer.allocate(4);
+ procBb.putInt(0, 200);
+ UserPayload processorPayload = UserPayload.create(procBb);
+ ProcessorDescriptor processorDescriptor =
+ ProcessorDescriptor.create("fakeProcessorName").setUserPayload(processorPayload);
+
+ ByteBuffer input1Bb = ByteBuffer.allocate(4);
+ input1Bb.putInt(0, 300);
+ UserPayload input1Payload = UserPayload.create(input1Bb);
+ InputDescriptor id1 = InputDescriptor.create("input1ClassName").setUserPayload(input1Payload);
+ InputSpec inputSpec1 = new InputSpec("sourceVertexName1", id1, 33);
+ InputSpec inputSpec2 = new InputSpec("sourceVertexName2", id1, 44);
+ List<InputSpec> inputSpecList = Lists.newArrayList(inputSpec1, inputSpec2);
+
+ ByteBuffer output1Bb = ByteBuffer.allocate(4);
+ output1Bb.putInt(0, 400);
+ UserPayload output1Payload = UserPayload.create(output1Bb);
+ OutputDescriptor od1 =
+ OutputDescriptor.create("output1ClassName").setUserPayload(output1Payload);
+ OutputSpec outputSpec1 = new OutputSpec("destVertexName1", od1, 55);
+ OutputSpec outputSpec2 = new OutputSpec("destVertexName2", od1, 66);
+ List<OutputSpec> outputSpecList = Lists.newArrayList(outputSpec1, outputSpec2);
+
+ ApplicationId appId = ApplicationId.newInstance(1000, 100);
+ TezDAGID tezDagId = TezDAGID.getInstance(appId, 300);
+ TezVertexID tezVertexId = TezVertexID.getInstance(tezDagId, 400);
+ TezTaskID tezTaskId = TezTaskID.getInstance(tezVertexId, 500);
+ TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tezTaskId, 600);
+
+ TaskSpec taskSpec =
+ new TaskSpec(tezTaskAttemptId, "dagName", "vertexName", 10, processorDescriptor,
+ inputSpecList, outputSpecList, null);
+
+
+ FragmentSpecProto fragmentSpecProto = Converters.convertTaskSpecToProto(taskSpec);
+
+
+ assertEquals("dagName", fragmentSpecProto.getDagName());
+ assertEquals("vertexName", fragmentSpecProto.getVertexName());
+ assertEquals(tezTaskAttemptId.toString(), fragmentSpecProto.getFragmentIdentifierString());
+ assertEquals(tezDagId.getId(), fragmentSpecProto.getDagId());
+ assertEquals(tezTaskAttemptId.getId(), fragmentSpecProto.getAttemptNumber());
+ assertEquals(tezTaskId.getId(), fragmentSpecProto.getFragmentNumber());
+ assertEquals(processorDescriptor.getClassName(),
+ fragmentSpecProto.getProcessorDescriptor().getClassName());
+ assertEquals(processorDescriptor.getUserPayload().getPayload(),
+ fragmentSpecProto.getProcessorDescriptor().getUserPayload().getUserPayload()
+ .asReadOnlyByteBuffer());
+ assertEquals(2, fragmentSpecProto.getInputSpecsCount());
+ assertEquals(2, fragmentSpecProto.getOutputSpecsCount());
+
+ verifyInputSpecAndProto(inputSpec1, fragmentSpecProto.getInputSpecs(0));
+ verifyInputSpecAndProto(inputSpec2, fragmentSpecProto.getInputSpecs(1));
+ verifyOutputSpecAndProto(outputSpec1, fragmentSpecProto.getOutputSpecs(0));
+ verifyOutputSpecAndProto(outputSpec2, fragmentSpecProto.getOutputSpecs(1));
+
+ }
+
+ @Test (timeout = 5000)
+ public void testFragmentSpecToTaskSpec() {
+
+ ByteBuffer procBb = ByteBuffer.allocate(4);
+ procBb.putInt(0, 200);
+
+ ByteBuffer input1Bb = ByteBuffer.allocate(4);
+ input1Bb.putInt(0, 300);
+
+ ByteBuffer output1Bb = ByteBuffer.allocate(4);
+ output1Bb.putInt(0, 400);
+
+ ApplicationId appId = ApplicationId.newInstance(1000, 100);
+ TezDAGID tezDagId = TezDAGID.getInstance(appId, 300);
+ TezVertexID tezVertexId = TezVertexID.getInstance(tezDagId, 400);
+ TezTaskID tezTaskId = TezTaskID.getInstance(tezVertexId, 500);
+ TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tezTaskId, 600);
+
+ FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder();
+ builder.setFragmentIdentifierString(tezTaskAttemptId.toString());
+ builder.setDagName("dagName");
+ builder.setVertexName("vertexName");
+ builder.setDagId(tezDagId.getId());
+ builder.setProcessorDescriptor(
+ EntityDescriptorProto.newBuilder().setClassName("fakeProcessorName").setUserPayload(
+ UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(procBb))));
+ builder.addInputSpecs(IOSpecProto.newBuilder().setConnectedVertexName("sourceVertexName1")
+ .setPhysicalEdgeCount(33).setIoDescriptor(
+ EntityDescriptorProto.newBuilder().setClassName("input1ClassName").setUserPayload(
+ UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(input1Bb)))));
+ builder.addInputSpecs(IOSpecProto.newBuilder().setConnectedVertexName("sourceVertexName2")
+ .setPhysicalEdgeCount(44).setIoDescriptor(
+ EntityDescriptorProto.newBuilder().setClassName("input1ClassName").setUserPayload(
+ UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(input1Bb)))));
+ builder.addOutputSpecs(IOSpecProto.newBuilder().setConnectedVertexName("destVertexName1")
+ .setPhysicalEdgeCount(55).setIoDescriptor(
+ EntityDescriptorProto.newBuilder().setClassName("outputClassName").setUserPayload(
+ UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(output1Bb)))));
+ builder.addOutputSpecs(IOSpecProto.newBuilder().setConnectedVertexName("destVertexName2")
+ .setPhysicalEdgeCount(66).setIoDescriptor(
+ EntityDescriptorProto.newBuilder().setClassName("outputClassName").setUserPayload(
+ UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(output1Bb)))));
+
+ FragmentSpecProto fragmentSpecProto = builder.build();
+
+ TaskSpec taskSpec = Converters.getTaskSpecfromProto(fragmentSpecProto);
+
+ assertEquals("dagName", taskSpec.getDAGName());
+ assertEquals("vertexName", taskSpec.getVertexName());
+ assertEquals(tezTaskAttemptId, taskSpec.getTaskAttemptID());
+ assertEquals("fakeProcessorName", taskSpec.getProcessorDescriptor().getClassName());
+ byte[] serialized = new byte[taskSpec.getProcessorDescriptor().getUserPayload().getPayload().remaining()];
+ taskSpec.getProcessorDescriptor().getUserPayload().getPayload().get(serialized);
+ assertArrayEquals(procBb.array(), serialized);
+
+ assertEquals(2, taskSpec.getInputs().size());
+ assertEquals(2, taskSpec.getOutputs().size());
+
+ verifyInputSpecAndProto(taskSpec.getInputs().get(0), fragmentSpecProto.getInputSpecs(0));
+ verifyInputSpecAndProto(taskSpec.getInputs().get(1), fragmentSpecProto.getInputSpecs(1));
+ verifyOutputSpecAndProto(taskSpec.getOutputs().get(0), fragmentSpecProto.getOutputSpecs(0));
+ verifyOutputSpecAndProto(taskSpec.getOutputs().get(1), fragmentSpecProto.getOutputSpecs(1));
+
+
+ }
+
+ private void verifyInputSpecAndProto(InputSpec inputSpec,
+ IOSpecProto inputSpecProto) {
+ assertEquals(inputSpec.getPhysicalEdgeCount(), inputSpecProto.getPhysicalEdgeCount());
+ assertEquals(inputSpec.getSourceVertexName(), inputSpecProto.getConnectedVertexName());
+ assertEquals(inputSpec.getInputDescriptor().getClassName(),
+ inputSpecProto.getIoDescriptor().getClassName());
+ assertEquals(inputSpec.getInputDescriptor().getUserPayload().getPayload(),
+ inputSpecProto.getIoDescriptor().getUserPayload().getUserPayload().asReadOnlyByteBuffer());
+ }
+
+ private void verifyOutputSpecAndProto(OutputSpec outputSpec,
+ IOSpecProto outputSpecProto) {
+ assertEquals(outputSpec.getPhysicalEdgeCount(), outputSpecProto.getPhysicalEdgeCount());
+ assertEquals(outputSpec.getDestinationVertexName(), outputSpecProto.getConnectedVertexName());
+ assertEquals(outputSpec.getOutputDescriptor().getClassName(),
+ outputSpecProto.getIoDescriptor().getClassName());
+ assertEquals(outputSpec.getOutputDescriptor().getUserPayload().getPayload(),
+ outputSpecProto.getIoDescriptor().getUserPayload().getUserPayload().asReadOnlyByteBuffer());
+ }
+}