You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/04 12:22:29 UTC
[11/18] flink git commit: [FLINK-8966][tests] Port
AvroExternalJarProgramITCase to flip6
[FLINK-8966][tests] Port AvroExternalJarProgramITCase to flip6
This closes #5766.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1a8dd06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1a8dd06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1a8dd06
Branch: refs/heads/master
Commit: f1a8dd0654030d6b4bd79732ef1d0f32c5f6820e
Parents: 3947a39
Author: zentol <ch...@apache.org>
Authored: Tue Apr 3 11:20:12 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:45 2018 +0200
----------------------------------------------------------------------
.../avro/AvroExternalJarProgramITCase.java | 75 +++++++---------
.../LegacyAvroExternalJarProgramITCase.java | 92 ++++++++++++++++++++
2 files changed, 124 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a8dd06/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
index 985471a..6766947 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
@@ -19,74 +19,63 @@
package org.apache.flink.formats.avro;
import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.testutils.category.New;
import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.File;
-import java.net.URL;
import java.util.Collections;
/**
* IT case for the {@link AvroExternalJarProgram}.
*/
+@Category(New.class)
public class AvroExternalJarProgramITCase extends TestLogger {
private static final String JAR_FILE = "maven-test-jar.jar";
private static final String TEST_DATA_FILE = "/testdata.avro";
- @Test
- public void testExternalProgram() {
-
- LocalFlinkMiniCluster testMiniCluster = null;
+ private static final int PARALLELISM = 4;
- try {
- int parallelism = 4;
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
- testMiniCluster = new LocalFlinkMiniCluster(config, false);
- testMiniCluster.start();
+ private static final MiniCluster MINI_CLUSTER = new MiniCluster(
+ new MiniClusterConfiguration.Builder()
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(PARALLELISM)
+ .build());
- String jarFile = JAR_FILE;
- String testData = getClass().getResource(TEST_DATA_FILE).toString();
+ @BeforeClass
+ public static void setUp() throws Exception {
+ MINI_CLUSTER.start();
+ }
- PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+ @AfterClass
+ public static void tearDown() {
+ TestEnvironment.unsetAsContext();
+ MINI_CLUSTER.closeAsync();
+ }
- TestEnvironment.setAsContext(
- testMiniCluster,
- parallelism,
- Collections.singleton(new Path(jarFile)),
- Collections.<URL>emptyList());
+ @Test
+ public void testExternalProgram() throws Exception {
+ TestEnvironment.setAsContext(
+ MINI_CLUSTER,
+ PARALLELISM,
+ Collections.singleton(new Path(JAR_FILE)),
+ Collections.emptyList());
- config.setString(JobManagerOptions.ADDRESS, "localhost");
- config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
+ String testData = getClass().getResource(TEST_DATA_FILE).toString();
- program.invokeInteractiveModeForExecution();
- }
- catch (Throwable t) {
- System.err.println(t.getMessage());
- t.printStackTrace();
- Assert.fail("Error during the packaged program execution: " + t.getMessage());
- }
- finally {
- TestEnvironment.unsetAsContext();
+ PackagedProgram program = new PackagedProgram(new File(JAR_FILE), new String[]{testData});
- if (testMiniCluster != null) {
- try {
- testMiniCluster.stop();
- } catch (Throwable t) {
- // ignore
- }
- }
- }
+ program.invokeInteractiveModeForExecution();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1a8dd06/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
new file mode 100644
index 0000000..1dd56a7
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+
+/**
+ * IT case for the {@link AvroExternalJarProgram}.
+ */
+public class LegacyAvroExternalJarProgramITCase extends TestLogger {
+
+ private static final String JAR_FILE = "maven-test-jar.jar";
+
+ private static final String TEST_DATA_FILE = "/testdata.avro";
+
+ @Test
+ public void testExternalProgram() {
+
+ LocalFlinkMiniCluster testMiniCluster = null;
+
+ try {
+ int parallelism = 4;
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+ testMiniCluster = new LocalFlinkMiniCluster(config, false);
+ testMiniCluster.start();
+
+ String jarFile = JAR_FILE;
+ String testData = getClass().getResource(TEST_DATA_FILE).toString();
+
+ PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+
+ TestEnvironment.setAsContext(
+ testMiniCluster,
+ parallelism,
+ Collections.singleton(new Path(jarFile)),
+ Collections.<URL>emptyList());
+
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+ config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
+
+ program.invokeInteractiveModeForExecution();
+ }
+ catch (Throwable t) {
+ System.err.println(t.getMessage());
+ t.printStackTrace();
+ Assert.fail("Error during the packaged program execution: " + t.getMessage());
+ }
+ finally {
+ TestEnvironment.unsetAsContext();
+
+ if (testMiniCluster != null) {
+ try {
+ testMiniCluster.stop();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+ }
+}