You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC
svn commit: r1783988 [24/24] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...
Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java Wed Feb 22 09:43:41 2017
@@ -117,15 +117,15 @@ public class TestTezGraceParallelism {
Util.createLogAppender("testDecreaseParallelism", writer, new Class[]{PigGraceShuffleVertexManager.class, ShuffleVertexManager.class});
try {
// DAG: 47 \
- // -> 49(join) -> 52(distinct) -> 61(group)
+ // -> 49(join) -> 52(distinct) -> 56(group)
// 48 /
// Parallelism at compile time:
// DAG: 47(1) \
- // -> 49(2) -> 52(20) -> 61(200)
+ // -> 49(2) -> 52(20) -> 56(200)
// 48(1) /
// However, when 49 finishes, the actual output of 49 only justify parallelism 1.
- // We adjust the parallelism for 61 to 100 based on this.
- // At runtime, ShuffleVertexManager still kick in and further reduce parallelism from 100 to 1.
+ // We adjust the parallelism for 56 to 7 based on this.
+ // At runtime, ShuffleVertexManager still kick in and further reduce parallelism from 7 to 1.
//
pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
@@ -140,10 +140,10 @@ public class TestTezGraceParallelism {
"('F',1349L)", "('M',1373L)"});
Util.checkQueryOutputsAfterSort(iter, expectedResults);
assertTrue(writer.toString().contains("Initialize parallelism for scope-52 to 18"));
- assertTrue(writer.toString().contains("Initialize parallelism for scope-61 to 7"));
+ assertTrue(writer.toString().contains("Initialize parallelism for scope-56 to 7"));
assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-49 to 1 from 2"));
assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-52 to 1 from 18"));
- assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-61 to 1 from 7"));
+ assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-56 to 1 from 7"));
} finally {
Util.removeLogAppender("testDecreaseParallelism", PigGraceShuffleVertexManager.class, ShuffleVertexManager.class);
}
@@ -217,8 +217,8 @@ public class TestTezGraceParallelism {
count++;
}
assertEquals(count, 20);
- assertTrue(writer.toString().contains("All predecessors for scope-84 are finished, time to set parallelism for scope-85"));
- assertTrue(writer.toString().contains("Initialize parallelism for scope-85 to 10"));
+ assertTrue(writer.toString().contains("All predecessors for scope-79 are finished, time to set parallelism for scope-80"));
+ assertTrue(writer.toString().contains("Initialize parallelism for scope-80 to 10"));
} finally {
Util.removeLogAppender("testJoinWithDifferentDepth", PigGraceShuffleVertexManager.class);
}
@@ -262,9 +262,9 @@ public class TestTezGraceParallelism {
StringWriter writer = new StringWriter();
Util.createLogAppender("testJoinWithUnion", writer, PigGraceShuffleVertexManager.class);
try {
- // DAG: 29 -> 32 -> 41 \
- // -> 70 (vertex group) -> 61
- // 42 -> 45 -> 54 /
+ // DAG: 29 -> 32 -> 36 \
+ // -> 55 (vertex group) -> 51
+ // 37 -> 40 -> 44 /
pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
pigServer.registerQuery("B = distinct A;");
pigServer.registerQuery("C = group B by name;");
@@ -280,8 +280,8 @@ public class TestTezGraceParallelism {
count++;
}
assertEquals(count, 20);
- assertTrue(writer.toString().contains("time to set parallelism for scope-41"));
- assertTrue(writer.toString().contains("time to set parallelism for scope-54"));
+ assertTrue(writer.toString().contains("time to set parallelism for scope-36"));
+ assertTrue(writer.toString().contains("time to set parallelism for scope-44"));
} finally {
Util.removeLogAppender("testJoinWithUnion", PigGraceShuffleVertexManager.class);
}
@@ -322,4 +322,33 @@ public class TestTezGraceParallelism {
super.setStoreLocation(location, job);
}
}
+
+ @Test
+ // See PIG-4786
+ public void testCross() throws IOException{
+ // scope-90 is the cross vertex. It should not use PigGraceShuffleVertexManager
+ NodeIdGenerator.reset();
+ PigServer.resetScope();
+ StringWriter writer = new StringWriter();
+ Util.createLogAppender("testCross", writer, PigGraceShuffleVertexManager.class);
+ File outputDir = File.createTempFile("intemediate", "txt");
+ outputDir.delete();
+ pigServer.getPigContext().getProperties().setProperty("mapreduce.input.fileinputformat.split.maxsize", "3000");
+ pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true");
+ pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+ pigServer.registerQuery("B = order A by name;");
+ pigServer.registerQuery("C = distinct B;");
+ pigServer.registerQuery("D = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("E = group D by name;");
+ pigServer.registerQuery("F = foreach E generate group as name, AVG(D.age) as avg_age;");
+ pigServer.registerQuery("G = cross C, F;");
+ Iterator<Tuple> iter = pigServer.openIterator("G");
+ int count = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ count++;
+ }
+ assertEquals(count, 400);
+ assertFalse(writer.toString().contains("scope-90"));
+ }
}
Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java Wed Feb 22 09:43:41 2017
@@ -21,7 +21,9 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
@@ -29,17 +31,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Properties;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
@@ -48,6 +53,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
@@ -57,8 +63,11 @@ import org.apache.pig.test.junit.Ordered
import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -77,7 +86,8 @@ import org.junit.runner.RunWith;
"testTezParallelismEstimatorFilterFlatten",
"testTezParallelismEstimatorHashJoin",
"testTezParallelismEstimatorSplitBranch",
- "testTezParallelismDefaultParallelism"
+ "testTezParallelismDefaultParallelism",
+ "testShuffleVertexManagerConfig"
})
public class TestTezJobControlCompiler {
private static PigContext pc;
@@ -89,6 +99,7 @@ public class TestTezJobControlCompiler {
public static void setUpBeforeClass() throws Exception {
input1 = Util.createTempFileDelOnExit("input1", "txt").toURI();
input2 = Util.createTempFileDelOnExit("input2", "txt").toURI();
+ FileUtils.deleteDirectory(new File("/tmp/pigoutput"));
}
@AfterClass
@@ -107,7 +118,7 @@ public class TestTezJobControlCompiler {
"a = load '" + input1 +"' as (x:int, y:int);" +
"b = filter a by x > 0;" +
"c = foreach b generate y;" +
- "store c into 'file:///tmp/output';";
+ "store c into 'file:///tmp/pigoutput';";
Pair<TezOperPlan, DAG> compiledPlan = compile(query);
@@ -127,7 +138,7 @@ public class TestTezJobControlCompiler {
"a = load '" + input1 +"' as (x:int, y:int);" +
"b = group a by x;" +
"c = foreach b generate group, a;" +
- "store c into 'file:///tmp/output';";
+ "store c into 'file:///tmp/pigoutput';";
Pair<TezOperPlan, DAG> compiledPlan = compile(query);
@@ -159,7 +170,7 @@ public class TestTezJobControlCompiler {
"b = load '" + input2 +"' as (x:int, z:int);" +
"c = join a by x, b by x;" +
"d = foreach c generate a::x as x, y, z;" +
- "store d into 'file:///tmp/output';";
+ "store d into 'file:///tmp/pigoutput';";
Pair<TezOperPlan, DAG> compiledPlan = compile(query);
@@ -289,6 +300,72 @@ public class TestTezJobControlCompiler {
TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
assertEquals(leafVertex.getParallelism(), 5);
+ pc.defaultParallel = -1;
+ }
+
+ @Test
+ public void testShuffleVertexManagerConfig() throws Exception{
+ pc.getProperties().setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "0.3");
+ pc.getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "500");
+
+ try {
+
+ String query = "a = load '10' using " + ArbitarySplitsLoader.class.getName()
+ + "() as (name:chararray, age:int, gpa:double);"
+ + "b = limit a 5;"
+ + "c = group b by name;"
+ + "store c into 'output';";
+
+ VertexManagerPluginDescriptor vmPlugin = getLeafVertexVMPlugin(query);
+ Configuration vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+
+ // Case of grace auto parallelism (PigGraceShuffleVertexManager)
+ assertEquals(PigGraceShuffleVertexManager.class.getName(), vmPlugin.getClassName());
+ // min and max src fraction, auto parallel, desired size, bytes.per.reducer, pig.tez.plan and pigcontext
+ assertEquals(7, vmPluginConf.size());
+ assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+ assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+ assertEquals("true", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL));
+ assertEquals("500", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE));
+ assertEquals("500", vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM));
+
+ // Case of auto parallelism (ShuffleVertexManager)
+ pc.getProperties().setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, "false");
+ vmPlugin = getLeafVertexVMPlugin(query);
+ vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+ assertEquals(ShuffleVertexManager.class.getName(), vmPlugin.getClassName());
+ // min and max src fraction, auto parallel, desired size
+ assertEquals(4, vmPluginConf.size());
+ assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+ assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+ assertEquals("true", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL));
+ assertEquals("500", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE));
+
+ // Case of default parallel or PARALLEL (ShuffleVertexManager)
+ pc.defaultParallel = 2;
+ vmPlugin = getLeafVertexVMPlugin(query);
+ vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+ assertEquals(ShuffleVertexManager.class.getName(), vmPlugin.getClassName());
+ // min and max src fraction
+ assertEquals(2, vmPluginConf.size());
+ assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+ assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+ } finally {
+ pc.getProperties().remove(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+ pc.getProperties().remove(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM);
+ pc.getProperties().remove(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM);
+ pc.defaultParallel = -1;
+ }
+ }
+
+ private VertexManagerPluginDescriptor getLeafVertexVMPlugin(String query) throws Exception {
+ Pair<TezOperPlan, DAG> compiledPlan = compile(query);
+ TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
+ Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
+ Field vmPluginField = Vertex.class.getDeclaredField("vertexManagerPlugin");
+ vmPluginField.setAccessible(true);
+ VertexManagerPluginDescriptor vmPlugin = (VertexManagerPluginDescriptor) vmPluginField.get(leafVertex);
+ return vmPlugin;
}
private Pair<TezOperPlan, DAG> compile(String query) throws Exception {
Added: pig/branches/spark/test/org/apache/pig/tez/TestTezJobExecution.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezJobExecution.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezJobExecution.java (added)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezJobExecution.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tez;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigRunner;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.test.Util;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test class for tez specific behaviour tests
+ */
+public class TestTezJobExecution {
+
+ private static final String TEST_DIR = Util.getTestDirectory(TestTezJobExecution.class);
+
+ private static final String INPUT_FILE = TEST_DIR + Path.SEPARATOR + "input";
+ private PigServer pigServer;
+
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+ Util.deleteDirectory(new File(TEST_DIR));
+ new File(TEST_DIR).mkdirs();
+ Util.createLocalInputFile(INPUT_FILE, new String[] {
+ "1", "1", "1", "2", "2", "2"
+ });
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ Util.deleteDirectory(new File(TEST_DIR));
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ pigServer = new PigServer("tez_local");
+ }
+
+ @Test
+ public void testUnionParallelHashValuePartition() throws IOException {
+ String output = TEST_DIR + Path.SEPARATOR + "output1";
+ String query = "A = LOAD '" + INPUT_FILE + "';"
+ + "B = LOAD '" + INPUT_FILE + "';"
+ + "C = UNION A, B PARALLEL 2;"
+ + "STORE C into '" + output + "';";
+ pigServer.registerQuery(query);
+ String part0 = FileUtils.readFileToString(new File(output + Path.SEPARATOR + "part-v002-o000-r-00000"));
+ String part1 = FileUtils.readFileToString(new File(output + Path.SEPARATOR + "part-v002-o000-r-00001"));
+ assertEquals("2\n2\n2\n2\n2\n2\n", part0);
+ assertEquals("1\n1\n1\n1\n1\n1\n", part1);
+ }
+
+ @Test
+ public void testDAGDiscoveryDisabled() throws IOException {
+ String output1 = TEST_DIR + Path.SEPARATOR + "output-parallel";
+ String output2 = TEST_DIR + Path.SEPARATOR + "output-autoparallel";
+ String scriptFile = TEST_DIR + Path.SEPARATOR + "testDAGRecoveryDisable.pig";
+ String query = "A = LOAD '" + INPUT_FILE + "';"
+ + "B = GROUP A BY $0 PARALLEL 1;"
+ + "STORE B into '" + output1 + "';"
+ + "exec;"
+ + "C = LOAD '" + INPUT_FILE + "';"
+ + "D = GROUP C BY $0;"
+ + "STORE D into '" + output2 + "';";
+
+ Util.createLocalInputFile(scriptFile, new String[] {query});
+
+ String[] args = { "-x", "tez_local", scriptFile };
+
+ TestNotificationListener listener = new TestNotificationListener();
+ // Recovery is not disabled when there is auto parallelism. Should reuse AM application session
+ PigStats stats = PigRunner.run(args, listener);
+ assertTrue(stats.isSuccessful());
+ assertEquals(1, listener.getJobsStarted().size());
+
+ Util.deleteFile(pigServer.getPigContext(), output1);
+ Util.deleteFile(pigServer.getPigContext(), output2);
+
+ // Recovery is disabled when there is auto parallelism. Should use two different AMs
+ listener.reset();
+ args = new String[] {
+ "-D" + PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY + "=true",
+ "-x",
+ "tez_local",
+ scriptFile };
+ stats = PigRunner.run(args, listener);
+ assertTrue(stats.isSuccessful());
+ assertEquals(2, listener.getJobsStarted().size());
+ }
+
+
+ private static class TestNotificationListener implements PigProgressNotificationListener {
+
+ private Set<String> jobsStarted = new HashSet<String>();
+
+ public void reset() {
+ this.jobsStarted.clear();
+ }
+
+ public Set<String> getJobsStarted() {
+ return jobsStarted;
+ }
+
+ @Override
+ public void initialPlanNotification(String scriptId,
+ OperatorPlan<?> plan) {
+ }
+
+ @Override
+ public void launchStartedNotification(String scriptId,
+ int numJobsToLaunch) {
+ }
+
+ @Override
+ public void jobsSubmittedNotification(String scriptId,
+ int numJobsSubmitted) {
+ }
+
+ @Override
+ public void jobStartedNotification(String scriptId, String assignedJobId) {
+ jobsStarted.add(assignedJobId);
+ }
+
+ @Override
+ public void jobFinishedNotification(String scriptId, JobStats jobStats) {
+ }
+
+ @Override
+ public void jobFailedNotification(String scriptId, JobStats jobStats) {
+ }
+
+ @Override
+ public void outputCompletedNotification(String scriptId,
+ OutputStats outputStats) {
+ }
+
+ @Override
+ public void progressUpdatedNotification(String scriptId, int progress) {
+
+ }
+
+ @Override
+ public void launchCompletedNotification(String scriptId,
+ int numJobsSucceeded) {
+ }
+
+ }
+
+}
Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java Wed Feb 22 09:43:41 2017
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTru
import java.util.Arrays;
import java.util.Iterator;
-import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
@@ -35,6 +34,7 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.test.MiniGenericCluster;
import org.apache.pig.test.Util;
import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.tez.dag.api.TezConfiguration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -124,11 +124,11 @@ public class TestTezLauncher {
@Test
public void testQueueName() throws Exception {
- Configuration conf = new Configuration();
+ TezConfiguration conf = new TezConfiguration();
conf.set("tez.queue.name", "special");
- conf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+ MRToTezHelper.translateMRSettingsForTezAM(conf);
assertEquals(conf.get("tez.queue.name"), "special");
-
+
}
}
Modified: pig/branches/spark/test/perf/pigmix/bin/generate_data.sh
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/perf/pigmix/bin/generate_data.sh?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/perf/pigmix/bin/generate_data.sh (original)
+++ pig/branches/spark/test/perf/pigmix/bin/generate_data.sh Wed Feb 22 09:43:41 2017
@@ -25,20 +25,11 @@ fi
source $PIGMIX_HOME/conf/config.sh
-if [ $HADOOP_VERSION == "23" ]; then
- echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot"
- $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot
-else
- echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot"
- $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot
-fi
+echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot"
+$HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot
shopt -s extglob
-if [ $HADOOP_VERSION == "23" ]; then
- pigjar=`echo $PIG_HOME/pig*-h2.jar`
-else
- pigjar=`echo $PIG_HOME/pig*-h1.jar`
-fi
+pigjar=`echo $PIG_HOME/pig*-h2.jar`
pigmixjar=$PIGMIX_HOME/pigmix.jar
Modified: pig/branches/spark/test/perf/pigmix/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/perf/pigmix/build.xml?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/perf/pigmix/build.xml (original)
+++ pig/branches/spark/test/perf/pigmix/build.xml Wed Feb 22 09:43:41 2017
@@ -34,6 +34,8 @@
</fileset>
</path>
+ <property name="hadoopversion" value="2" />
+
<property name="java.dir" value="${basedir}/src/java"/>
<property name="pigmix.build.dir" value="${basedir}/build"/>
<property name="pigmix.jar" value="${basedir}/pigmix.jar"/>