You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/04 12:22:29 UTC

[11/18] flink git commit: [FLINK-8966][tests] Port AvroExternalJarProgramITCase to flip6

[FLINK-8966][tests] Port AvroExternalJarProgramITCase to flip6

This closes #5766.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1a8dd06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1a8dd06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1a8dd06

Branch: refs/heads/master
Commit: f1a8dd0654030d6b4bd79732ef1d0f32c5f6820e
Parents: 3947a39
Author: zentol <ch...@apache.org>
Authored: Tue Apr 3 11:20:12 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:45 2018 +0200

----------------------------------------------------------------------
 .../avro/AvroExternalJarProgramITCase.java      | 75 +++++++---------
 .../LegacyAvroExternalJarProgramITCase.java     | 92 ++++++++++++++++++++
 2 files changed, 124 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1a8dd06/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
index 985471a..6766947 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
@@ -19,74 +19,63 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.testutils.category.New;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.File;
-import java.net.URL;
 import java.util.Collections;
 
 /**
  * IT case for the {@link AvroExternalJarProgram}.
  */
+@Category(New.class)
 public class AvroExternalJarProgramITCase extends TestLogger {
 
 	private static final String JAR_FILE = "maven-test-jar.jar";
 
 	private static final String TEST_DATA_FILE = "/testdata.avro";
 
-	@Test
-	public void testExternalProgram() {
-
-		LocalFlinkMiniCluster testMiniCluster = null;
+	private static final int PARALLELISM = 4;
 
-		try {
-			int parallelism = 4;
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
-			testMiniCluster = new LocalFlinkMiniCluster(config, false);
-			testMiniCluster.start();
+	private static final MiniCluster MINI_CLUSTER = new MiniCluster(
+		new MiniClusterConfiguration.Builder()
+			.setNumTaskManagers(1)
+			.setNumSlotsPerTaskManager(PARALLELISM)
+			.build());
 
-			String jarFile = JAR_FILE;
-			String testData = getClass().getResource(TEST_DATA_FILE).toString();
+	@BeforeClass
+	public static void setUp() throws Exception {
+		MINI_CLUSTER.start();
+	}
 
-			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+	@AfterClass
+	public static void tearDown() {
+		TestEnvironment.unsetAsContext();
+		MINI_CLUSTER.closeAsync();
+	}
 
-			TestEnvironment.setAsContext(
-				testMiniCluster,
-				parallelism,
-				Collections.singleton(new Path(jarFile)),
-				Collections.<URL>emptyList());
+	@Test
+	public void testExternalProgram() throws Exception {
+		TestEnvironment.setAsContext(
+			MINI_CLUSTER,
+			PARALLELISM,
+			Collections.singleton(new Path(JAR_FILE)),
+			Collections.emptyList());
 
-			config.setString(JobManagerOptions.ADDRESS, "localhost");
-			config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
+		String testData = getClass().getResource(TEST_DATA_FILE).toString();
 
-			program.invokeInteractiveModeForExecution();
-		}
-		catch (Throwable t) {
-			System.err.println(t.getMessage());
-			t.printStackTrace();
-			Assert.fail("Error during the packaged program execution: " + t.getMessage());
-		}
-		finally {
-			TestEnvironment.unsetAsContext();
+		PackagedProgram program = new PackagedProgram(new File(JAR_FILE), new String[]{testData});
 
-			if (testMiniCluster != null) {
-				try {
-					testMiniCluster.stop();
-				} catch (Throwable t) {
-					// ignore
-				}
-			}
-		}
+		program.invokeInteractiveModeForExecution();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f1a8dd06/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
new file mode 100644
index 0000000..1dd56a7
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+
+/**
+ * IT case for the {@link AvroExternalJarProgram}.
+ */
+public class LegacyAvroExternalJarProgramITCase extends TestLogger {
+
+	private static final String JAR_FILE = "maven-test-jar.jar";
+
+	private static final String TEST_DATA_FILE = "/testdata.avro";
+
+	@Test
+	public void testExternalProgram() {
+
+		LocalFlinkMiniCluster testMiniCluster = null;
+
+		try {
+			int parallelism = 4;
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+			testMiniCluster = new LocalFlinkMiniCluster(config, false);
+			testMiniCluster.start();
+
+			String jarFile = JAR_FILE;
+			String testData = getClass().getResource(TEST_DATA_FILE).toString();
+
+			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+
+			TestEnvironment.setAsContext(
+				testMiniCluster,
+				parallelism,
+				Collections.singleton(new Path(jarFile)),
+				Collections.<URL>emptyList());
+
+			config.setString(JobManagerOptions.ADDRESS, "localhost");
+			config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
+
+			program.invokeInteractiveModeForExecution();
+		}
+		catch (Throwable t) {
+			System.err.println(t.getMessage());
+			t.printStackTrace();
+			Assert.fail("Error during the packaged program execution: " + t.getMessage());
+		}
+		finally {
+			TestEnvironment.unsetAsContext();
+
+			if (testMiniCluster != null) {
+				try {
+					testMiniCluster.stop();
+				} catch (Throwable t) {
+					// ignore
+				}
+			}
+		}
+	}
+}