You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/11/19 19:17:40 UTC
tez git commit: TEZ-2952. NPE in TestOnFileUnorderedKVOutput (bikas)
Repository: tez
Updated Branches:
refs/heads/master c07f284ac -> 4561b8252
TEZ-2952. NPE in TestOnFileUnorderedKVOutput (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4561b825
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4561b825
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4561b825
Branch: refs/heads/master
Commit: 4561b82524ca6ee484910349a6c95703883757b6
Parents: c07f284
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Nov 19 10:17:22 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Nov 19 10:17:22 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../runtime/api/impl/TestProcessorContext.java | 20 +++++++++----
.../output/TestOnFileUnorderedKVOutput.java | 31 ++++++++++++--------
3 files changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4561b825/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ea88aff..9644a1e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
TEZ-2948. Stop using dagName in the dagComplete notification to TaskCommunicators.
ALL CHANGES:
+ TEZ-2952. NPE in TestOnFileUnorderedKVOutput
TEZ-2480. Exception when closing output is ignored.
TEZ-2944. NPE in TestProcessorContext.
TEZ-2945. TEZ-2740 addendum to update API with currently supported parameters
http://git-wip-us.apache.org/repos/asf/tez/blob/4561b825/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
index 40bc257..f0c1e66 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
@@ -16,17 +16,17 @@ package org.apache.tez.runtime.api.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Map;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -43,7 +43,7 @@ import org.junit.Test;
public class TestProcessorContext {
@Test (timeout = 5000)
- public void testDagNumber() {
+ public void testDagNumber() throws IOException {
String[] localDirs = new String[] {"dummyLocalDir"};
int appAttemptNumber = 1;
TezUmbilical tezUmbilical = mock(TezUmbilical.class);
@@ -57,8 +57,14 @@ public class TestProcessorContext {
TezTaskID taskId = TezTaskID.getInstance(vertexId, 4);
TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 2);
- LogicalIOProcessorRuntimeTask runtimeTask = mock(LogicalIOProcessorRuntimeTask.class);
- doReturn(new TezCounters()).when(runtimeTask).addAndGetTezCounter(any(String.class));
+ TaskSpec mockSpec = mock(TaskSpec.class);
+ when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
+ when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
+ LogicalIOProcessorRuntimeTask runtimeTask = new LogicalIOProcessorRuntimeTask(
+ mockSpec, 1,
+ new Configuration(), new String[]{"/"},
+ tezUmbilical, null, null, null, null, "", null, 1024, false);
+ LogicalIOProcessorRuntimeTask mockTask = spy(runtimeTask);
Map<String, ByteBuffer> serviceConsumerMetadata = Maps.newHashMap();
Map<String, String> auxServiceEnv = Maps.newHashMap();
MemoryDistributor memDist = mock(MemoryDistributor.class);
@@ -96,5 +102,9 @@ public class TestProcessorContext {
assertEquals(vertexName, procContext.getTaskVertexName());
assertEquals(vertexId.getId(), procContext.getTaskVertexIndex());
assertTrue(Arrays.equals(localDirs, procContext.getWorkDirs()));
+
+ // test auto call of notifyProgress
+ procContext.setProgress(0.1f);
+ verify(mockTask, times(1)).notifyProgressInvocation();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4561b825/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 2b25daf..32a3619 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -33,6 +33,7 @@ import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -49,7 +50,6 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezDAGID;
@@ -62,7 +62,9 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.api.impl.TaskStatistics;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.common.resources.MemoryDistributor;
@@ -91,7 +93,7 @@ public class TestOnFileUnorderedKVOutput {
private static Path workDir = null;
private static final int shufflePort = 2112;
- TaskStatistics stats;
+ LogicalIOProcessorRuntimeTask task;
static {
defaultConf.set("fs.defaultFS", "file:///");
@@ -108,7 +110,6 @@ public class TestOnFileUnorderedKVOutput {
@Before
public void setup() throws Exception {
- stats = new TaskStatistics();
localFs.mkdirs(workDir);
}
@@ -139,8 +140,8 @@ public class TestOnFileUnorderedKVOutput {
}
events = kvOutput.close();
- assertEquals(45, stats.getIOStatistics().values().iterator().next().getDataSize());
- assertEquals(5, stats.getIOStatistics().values().iterator().next().getItemsProcessed());
+ assertEquals(45, task.getTaskStatistics().getIOStatistics().values().iterator().next().getDataSize());
+ assertEquals(5, task.getTaskStatistics().getIOStatistics().values().iterator().next().getItemsProcessed());
assertTrue(events != null && events.size() == 1);
CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0);
@@ -212,12 +213,18 @@ public class TestOnFileUnorderedKVOutput {
TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
- TezCounters counters = new TezCounters();
UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
- LogicalIOProcessorRuntimeTask runtimeTask = mock(LogicalIOProcessorRuntimeTask.class);
- when(runtimeTask.addAndGetTezCounter(destinationVertexName)).thenReturn(counters);
- when(runtimeTask.getTaskStatistics()).thenReturn(stats);
-
+
+ TaskSpec mockSpec = mock(TaskSpec.class);
+ when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
+ when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
+ task = new LogicalIOProcessorRuntimeTask(
+ mockSpec, appAttemptNumber,
+ new Configuration(), new String[]{"/"},
+ tezUmbilical, null, null, null, null, "", null, 1024, false);
+
+ LogicalIOProcessorRuntimeTask runtimeTask = spy(task);
+
Map<String, String> auxEnv = new HashMap<String, String>();
ByteBuffer bb = ByteBuffer.allocate(4);
bb.putInt(shufflePort);
@@ -236,7 +243,7 @@ public class TestOnFileUnorderedKVOutput {
verify(runtimeTask, times(1)).addAndGetTezCounter(destinationVertexName);
verify(runtimeTask, times(1)).getTaskStatistics();
// verify output stats object got created
- Assert.assertTrue(stats.getIOStatistics().containsKey(destinationVertexName));
+ Assert.assertTrue(task.getTaskStatistics().getIOStatistics().containsKey(destinationVertexName));
OutputContext outputContext = spy(realOutputContext);
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws Throwable {