You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:04:05 UTC
[49/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
deleted file mode 100644
index 7d127ff..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.connectors.fs;
-
-import com.google.common.collect.Sets;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
-import org.apache.flink.util.NetUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Random;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.junit.Assert.assertTrue;
-
-/**
-* Tests for {@link RollingSink}.
-*
-* <p>
-* This test only verifies the exactly once behaviour of the sink. Another test tests the
-* rolling behaviour.
-*
-* <p>
-* This differs from RollingSinkFaultToleranceITCase in that the checkpoint interval is extremely
-* high. This provokes the case that the sink restarts without any checkpoint having been performed.
-* This tests the initial cleanup of pending/in-progress files.
-*/
-public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBase {
-
- final long NUM_STRINGS = 16_000;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
-
- private static MiniDFSCluster hdfsCluster;
- private static org.apache.hadoop.fs.FileSystem dfs;
-
- private static String outPath;
-
-
-
- @BeforeClass
- public static void createHDFS() throws IOException {
- Configuration conf = new Configuration();
-
- File dataDir = tempFolder.newFolder();
-
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
- MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
- hdfsCluster = builder.build();
-
- dfs = hdfsCluster.getFileSystem();
-
- outPath = "hdfs://"
- + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
- + "/string-non-rolling-out-no-checkpoint";
- }
-
- @AfterClass
- public static void destroyHDFS() {
- if (hdfsCluster != null) {
- hdfsCluster.shutdown();
- }
- }
-
-
- @Override
- public void testProgram(StreamExecutionEnvironment env) {
- assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
- int PARALLELISM = 6;
-
- env.enableCheckpointing(Long.MAX_VALUE);
- env.setParallelism(PARALLELISM);
- env.disableOperatorChaining();
-
- DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
-
- DataStream<String> mapped = stream
- .map(new OnceFailingIdentityMapper(NUM_STRINGS));
-
- RollingSink<String> sink = new RollingSink<String>(outPath)
- .setBucketer(new NonRollingBucketer())
- .setBatchSize(5000)
- .setValidLengthPrefix("")
- .setPendingPrefix("");
-
- mapped.addSink(sink);
-
- }
-
- @Override
- public void postSubmit() throws Exception {
- // We read the files and verify that we have read all the strings. If a valid-length
- // file exists we only read the file to that point. (This test should work with
- // FileSystems that support truncate() and with others as well.)
-
- Pattern messageRegex = Pattern.compile("message (\\d*)");
-
- // Keep a set of the message IDs that we read. The size must equal the read count and
- // the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some
- // elements twice.
- Set<Integer> readNumbers = Sets.newHashSet();
- int numRead = 0;
-
- RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(
- outPath), true);
-
- while (files.hasNext()) {
- LocatedFileStatus file = files.next();
-
- if (!file.getPath().toString().endsWith(".valid-length")) {
- int validLength = (int) file.getLen();
- if (dfs.exists(file.getPath().suffix(".valid-length"))) {
- FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length"));
- String validLengthString = inStream.readUTF();
- validLength = Integer.parseInt(validLengthString);
- System.out.println("VALID LENGTH: " + validLength);
- }
- FSDataInputStream inStream = dfs.open(file.getPath());
- byte[] buffer = new byte[validLength];
- inStream.readFully(0, buffer, 0, validLength);
- inStream.close();
-
- ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
-
- InputStreamReader inStreamReader = new InputStreamReader(bais);
- BufferedReader br = new BufferedReader(inStreamReader);
-
- String line = br.readLine();
- while (line != null) {
- Matcher matcher = messageRegex.matcher(line);
- if (matcher.matches()) {
- numRead++;
- int messageId = Integer.parseInt(matcher.group(1));
- readNumbers.add(messageId);
- } else {
- Assert.fail("Read line does not match expected pattern.");
- }
- line = br.readLine();
- }
- br.close();
- inStreamReader.close();
- bais.close();
- }
- }
-
- // Verify that we read all strings (at-least-once)
- Assert.assertEquals(NUM_STRINGS, readNumbers.size());
-
- // Verify that we don't have duplicates (boom!, exactly-once)
- Assert.assertEquals(NUM_STRINGS, numRead);
- }
-
- private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> {
- private static final long serialVersionUID = 1L;
-
- private static volatile boolean hasFailed = false;
-
- private final long numElements;
-
- private long failurePos;
- private long count;
-
-
- OnceFailingIdentityMapper(long numElements) {
- this.numElements = numElements;
- }
-
- @Override
- public void open(org.apache.flink.configuration.Configuration parameters) throws IOException {
- long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
- long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-
- failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
- count = 0;
- }
-
- @Override
- public String map(String value) throws Exception {
- count++;
- if (!hasFailed && count >= failurePos) {
- hasFailed = true;
- throw new Exception("Test Failure");
- }
-
- return value;
- }
- }
-
- private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
- implements CheckpointedAsynchronously<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- private final long numElements;
-
- private int index;
-
- private volatile boolean isRunning = true;
-
-
- StringGeneratingSourceFunction(long numElements) {
- this.numElements = numElements;
- }
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- final Object lockingObject = ctx.getCheckpointLock();
-
- final int step = getRuntimeContext().getNumberOfParallelSubtasks();
-
- if (index == 0) {
- index = getRuntimeContext().getIndexOfThisSubtask();
- }
-
- while (isRunning && index < numElements) {
-
- Thread.sleep(1);
- synchronized (lockingObject) {
- ctx.collect("message " + index);
- index += step;
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- private static String randomString(StringBuilder bld, Random rnd) {
- final int len = rnd.nextInt(10) + 5;
-
- for (int i = 0; i < len; i++) {
- char next = (char) (rnd.nextInt(20000) + 33);
- bld.append(next);
- }
-
- return bld.toString();
- }
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return index;
- }
-
- @Override
- public void restoreState(Integer state) {
- index = state;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
deleted file mode 100644
index 65904d2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import com.google.common.collect.Sets;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
-import org.apache.flink.util.NetUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Random;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link org.apache.flink.streaming.connectors.fs.RollingSink}.
- *
- * <p>
- * This test only verifies the exactly once behaviour of the sink. Another test tests the
- * rolling behaviour.
- */
-public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase {
-
- final long NUM_STRINGS = 16_000;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
-
- private static MiniDFSCluster hdfsCluster;
- private static org.apache.hadoop.fs.FileSystem dfs;
-
- private static String outPath;
-
-
-
- @BeforeClass
- public static void createHDFS() throws IOException {
- Configuration conf = new Configuration();
-
- File dataDir = tempFolder.newFolder();
-
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
- MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
- hdfsCluster = builder.build();
-
- dfs = hdfsCluster.getFileSystem();
-
- outPath = "hdfs://"
- + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
- + "/string-non-rolling-out";
- }
-
- @AfterClass
- public static void destroyHDFS() {
- if (hdfsCluster != null) {
- hdfsCluster.shutdown();
- }
- }
-
-
- @Override
- public void testProgram(StreamExecutionEnvironment env) {
- assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
- int PARALLELISM = 6;
-
- env.enableCheckpointing(200);
- env.setParallelism(PARALLELISM);
- env.disableOperatorChaining();
-
- DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
-
- DataStream<String> mapped = stream
- .map(new OnceFailingIdentityMapper(NUM_STRINGS));
-
- RollingSink<String> sink = new RollingSink<String>(outPath)
- .setBucketer(new NonRollingBucketer())
- .setBatchSize(10000)
- .setValidLengthPrefix("")
- .setPendingPrefix("");
-
- mapped.addSink(sink);
-
- }
-
- @Override
- public void postSubmit() throws Exception {
- // We read the files and verify that we have read all the strings. If a valid-length
- // file exists we only read the file to that point. (This test should work with
- // FileSystems that support truncate() and with others as well.)
-
- Pattern messageRegex = Pattern.compile("message (\\d*)");
-
- // Keep a set of the message IDs that we read. The size must equal the read count and
- // the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some
- // elements twice.
- Set<Integer> readNumbers = Sets.newHashSet();
- int numRead = 0;
-
- RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(
- outPath), true);
-
- while (files.hasNext()) {
- LocatedFileStatus file = files.next();
-
- if (!file.getPath().toString().endsWith(".valid-length")) {
- int validLength = (int) file.getLen();
- if (dfs.exists(file.getPath().suffix(".valid-length"))) {
- FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length"));
- String validLengthString = inStream.readUTF();
- validLength = Integer.parseInt(validLengthString);
- System.out.println("VALID LENGTH: " + validLength);
- }
- FSDataInputStream inStream = dfs.open(file.getPath());
- byte[] buffer = new byte[validLength];
- inStream.readFully(0, buffer, 0, validLength);
- inStream.close();
-
- ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
-
- InputStreamReader inStreamReader = new InputStreamReader(bais);
- BufferedReader br = new BufferedReader(inStreamReader);
-
- String line = br.readLine();
- while (line != null) {
- Matcher matcher = messageRegex.matcher(line);
- if (matcher.matches()) {
- numRead++;
- int messageId = Integer.parseInt(matcher.group(1));
- readNumbers.add(messageId);
- } else {
- Assert.fail("Read line does not match expected pattern.");
- }
- line = br.readLine();
- }
- br.close();
- inStreamReader.close();
- bais.close();
- }
- }
-
- // Verify that we read all strings (at-least-once)
- Assert.assertEquals(NUM_STRINGS, readNumbers.size());
-
- // Verify that we don't have duplicates (boom!, exactly-once)
- Assert.assertEquals(NUM_STRINGS, numRead);
- }
-
- private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> {
- private static final long serialVersionUID = 1L;
-
- private static volatile boolean hasFailed = false;
-
- private final long numElements;
-
- private long failurePos;
- private long count;
-
-
- OnceFailingIdentityMapper(long numElements) {
- this.numElements = numElements;
- }
-
- @Override
- public void open(org.apache.flink.configuration.Configuration parameters) throws IOException {
- long failurePosMin = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
- long failurePosMax = (long) (0.9 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-
- failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
- count = 0;
- }
-
- @Override
- public String map(String value) throws Exception {
- count++;
- if (!hasFailed && count >= failurePos) {
- hasFailed = true;
- throw new Exception("Test Failure");
- }
-
- return value;
- }
- }
-
- private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
- implements CheckpointedAsynchronously<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- private final long numElements;
-
- private int index;
-
- private volatile boolean isRunning = true;
-
-
- StringGeneratingSourceFunction(long numElements) {
- this.numElements = numElements;
- }
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- final Object lockingObject = ctx.getCheckpointLock();
-
- final int step = getRuntimeContext().getNumberOfParallelSubtasks();
-
- if (index == 0) {
- index = getRuntimeContext().getIndexOfThisSubtask();
- }
-
- while (isRunning && index < numElements) {
-
- Thread.sleep(1);
- synchronized (lockingObject) {
- ctx.collect("message " + index);
- index += step;
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- private static String randomString(StringBuilder bld, Random rnd) {
- final int len = rnd.nextInt(10) + 5;
-
- for (int i = 0; i < len; i++) {
- char next = (char) (rnd.nextInt(20000) + 33);
- bld.append(next);
- }
-
- return bld.toString();
- }
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return index;
- }
-
- @Override
- public void restoreState(Integer state) {
- index = state;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
deleted file mode 100644
index 9770f41..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.taskmanager.MultiShotLatch;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.NetUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-/**
- * Tests for {@link RollingSink}. These
- * tests test the different output methods as well as the rolling feature using a manual clock
- * that increases time in lockstep with element computation using latches.
- *
- * <p>
- * This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies
- * exactly once behaviour.
- */
-public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
-
- private static MiniDFSCluster hdfsCluster;
- private static org.apache.hadoop.fs.FileSystem dfs;
- private static String hdfsURI;
-
-
- @BeforeClass
- public static void createHDFS() throws IOException {
- Configuration conf = new Configuration();
-
- File dataDir = tempFolder.newFolder();
-
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
- MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
- hdfsCluster = builder.build();
-
- dfs = hdfsCluster.getFileSystem();
-
- hdfsURI = "hdfs://"
- + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
- + "/";
- }
-
- @AfterClass
- public static void destroyHDFS() {
- hdfsCluster.shutdown();
- }
-
- /**
- * This tests {@link StringWriter} with
- * non-rolling output.
- */
- @Test
- public void testNonRollingStringWriter() throws Exception {
- final int NUM_ELEMENTS = 20;
- final int PARALLELISM = 2;
- final String outPath = hdfsURI + "/string-non-rolling-out";
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
-
- DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
- .broadcast()
- .filter(new OddEvenFilter());
-
- RollingSink<String> sink = new RollingSink<String>(outPath)
- .setBucketer(new NonRollingBucketer())
- .setPartPrefix("part")
- .setPendingPrefix("")
- .setPendingSuffix("");
-
- source
- .map(new MapFunction<Tuple2<Integer,String>, String>() {
- private static final long serialVersionUID = 1L;
- @Override
- public String map(Tuple2<Integer, String> value) throws Exception {
- return value.f1;
- }
- })
- .addSink(sink);
-
- env.execute("RollingSink String Write Test");
-
- FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
-
- BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
- for (int i = 0; i < NUM_ELEMENTS; i += 2) {
- String line = br.readLine();
- Assert.assertEquals("message #" + i, line);
- }
-
- inStream.close();
-
- inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
- br = new BufferedReader(new InputStreamReader(inStream));
-
- for (int i = 1; i < NUM_ELEMENTS; i += 2) {
- String line = br.readLine();
- Assert.assertEquals("message #" + i, line);
- }
-
- inStream.close();
- }
-
- /**
- * This tests {@link SequenceFileWriter}
- * with non-rolling output and without compression.
- */
- @Test
- public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {
- final int NUM_ELEMENTS = 20;
- final int PARALLELISM = 2;
- final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
-
- DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
- .broadcast()
- .filter(new OddEvenFilter());
-
- DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
- return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
- }
- });
-
-
- RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
- .setWriter(new SequenceFileWriter<IntWritable, Text>())
- .setBucketer(new NonRollingBucketer())
- .setPartPrefix("part")
- .setPendingPrefix("")
- .setPendingSuffix("");
-
- mapped.addSink(sink);
-
- env.execute("RollingSink String Write Test");
-
- FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
-
- SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
- 1000,
- 0,
- 100000,
- new Configuration());
-
- IntWritable intWritable = new IntWritable();
- Text txt = new Text();
-
- for (int i = 0; i < NUM_ELEMENTS; i += 2) {
- reader.next(intWritable, txt);
- Assert.assertEquals(i, intWritable.get());
- Assert.assertEquals("message #" + i, txt.toString());
- }
-
- reader.close();
- inStream.close();
-
- inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
- reader = new SequenceFile.Reader(inStream,
- 1000,
- 0,
- 100000,
- new Configuration());
-
- for (int i = 1; i < NUM_ELEMENTS; i += 2) {
- reader.next(intWritable, txt);
- Assert.assertEquals(i, intWritable.get());
- Assert.assertEquals("message #" + i, txt.toString());
- }
-
- reader.close();
- inStream.close();
- }
-
- /**
- * This tests {@link SequenceFileWriter}
- * with non-rolling output but with compression.
- */
- @Test
- public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {
- final int NUM_ELEMENTS = 20;
- final int PARALLELISM = 2;
- final String outPath = hdfsURI + "/seq-non-rolling-out";
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
-
- DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
- .broadcast()
- .filter(new OddEvenFilter());
-
- DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
- return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
- }
- });
-
-
- RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
- .setWriter(new SequenceFileWriter<IntWritable, Text>("Default", SequenceFile.CompressionType.BLOCK))
- .setBucketer(new NonRollingBucketer())
- .setPartPrefix("part")
- .setPendingPrefix("")
- .setPendingSuffix("");
-
- mapped.addSink(sink);
-
- env.execute("RollingSink String Write Test");
-
- FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
-
- SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
- 1000,
- 0,
- 100000,
- new Configuration());
-
- IntWritable intWritable = new IntWritable();
- Text txt = new Text();
-
- for (int i = 0; i < NUM_ELEMENTS; i += 2) {
- reader.next(intWritable, txt);
- Assert.assertEquals(i, intWritable.get());
- Assert.assertEquals("message #" + i, txt.toString());
- }
-
- reader.close();
- inStream.close();
-
- inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
- reader = new SequenceFile.Reader(inStream,
- 1000,
- 0,
- 100000,
- new Configuration());
-
- for (int i = 1; i < NUM_ELEMENTS; i += 2) {
- reader.next(intWritable, txt);
- Assert.assertEquals(i, intWritable.get());
- Assert.assertEquals("message #" + i, txt.toString());
- }
-
- reader.close();
- inStream.close();
- }
-
- // we use this to synchronize the clock changes to elements being processed
- final static MultiShotLatch latch1 = new MultiShotLatch();
- final static MultiShotLatch latch2 = new MultiShotLatch();
-
- /**
- * This uses {@link org.apache.flink.streaming.connectors.fs.DateTimeBucketer} to
- * produce rolling files. The clock of DateTimeBucketer is set to
- * {@link ModifyableClock} to keep the time in lockstep with the processing of elements using
- * latches.
- */
- @Test
- public void testDateTimeRollingStringWriter() throws Exception {
- final int NUM_ELEMENTS = 20;
- final int PARALLELISM = 2;
- final String outPath = hdfsURI + "/rolling-out";
- DateTimeBucketer.setClock(new ModifyableClock());
- ModifyableClock.setCurrentTime(0);
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
-
-
-
- DataStream<Tuple2<Integer, String>> source = env.addSource(new WaitingTestSourceFunction(
- NUM_ELEMENTS))
- .broadcast();
-
- // the parallel flatMap is chained to the sink, so when it has seen 5 elements it can
- // fire the latch
- DataStream<String> mapped = source
- .flatMap(new RichFlatMapFunction<Tuple2<Integer, String>, String>() {
- private static final long serialVersionUID = 1L;
-
- int count = 0;
- @Override
- public void flatMap(Tuple2<Integer, String> value,
- Collector<String> out) throws Exception {
- out.collect(value.f1);
- count++;
- if (count >= 5) {
- if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
- latch1.trigger();
- } else {
- latch2.trigger();
- }
- count = 0;
- }
- }
-
- });
-
- RollingSink<String> sink = new RollingSink<String>(outPath)
- .setBucketer(new DateTimeBucketer("ss"))
- .setPartPrefix("part")
- .setPendingPrefix("")
- .setPendingSuffix("");
-
- mapped.addSink(sink);
-
- env.execute("RollingSink String Write Test");
-
- RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(outPath), true);
-
- // we should have 8 rolling files, 4 time intervals and parallelism of 2
- int numFiles = 0;
- while (files.hasNext()) {
- LocatedFileStatus file = files.next();
- numFiles++;
- if (file.getPath().toString().contains("rolling-out/00")) {
- FSDataInputStream inStream = dfs.open(file.getPath());
-
- BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
- for (int i = 0; i < 5; i++) {
- String line = br.readLine();
- Assert.assertEquals("message #" + i, line);
- }
-
- inStream.close();
- } else if (file.getPath().toString().contains("rolling-out/05")) {
- FSDataInputStream inStream = dfs.open(file.getPath());
-
- BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
- for (int i = 5; i < 10; i++) {
- String line = br.readLine();
- Assert.assertEquals("message #" + i, line);
- }
-
- inStream.close();
- } else if (file.getPath().toString().contains("rolling-out/10")) {
- FSDataInputStream inStream = dfs.open(file.getPath());
-
- BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
- for (int i = 10; i < 15; i++) {
- String line = br.readLine();
- Assert.assertEquals("message #" + i, line);
- }
-
- inStream.close();
- } else if (file.getPath().toString().contains("rolling-out/15")) {
- FSDataInputStream inStream = dfs.open(file.getPath());
-
- BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
- for (int i = 15; i < 20; i++) {
- String line = br.readLine();
- Assert.assertEquals("message #" + i, line);
- }
-
- inStream.close();
- } else {
- Assert.fail("File " + file + " does not match any expected roll pattern.");
- }
- }
-
- Assert.assertEquals(8, numFiles);
- }
-
-
- private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- private final int numElements;
-
- public TestSourceFunction(int numElements) {
- this.numElements = numElements;
- }
-
- @Override
- public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
- for (int i = 0; i < numElements && running; i++) {
- ctx.collect(Tuple2.of(i, "message #" + i));
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
-
- /**
- * This waits on the two multi-shot latches. The latches are triggered in a parallel
- * flatMap inside the test topology.
- */
- private static class WaitingTestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- private final int numElements;
-
- public WaitingTestSourceFunction(int numElements) {
- this.numElements = numElements;
- }
-
- @Override
- public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
- for (int i = 0; i < numElements && running; i++) {
- if (i % 5 == 0 && i > 0) {
- // update the clock after "five seconds", so we get 20 seconds in total
- // with 5 elements in each time window
- latch1.await();
- latch2.await();
- ModifyableClock.setCurrentTime(i * 1000);
- }
- ctx.collect(Tuple2.of(i, "message #" + i));
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
-
- public static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Tuple2<Integer, String> value) throws Exception {
- if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
- return value.f0 % 2 == 0;
- } else {
- return value.f0 % 2 == 1;
- }
- }
- }
-
- public static class ModifyableClock implements Clock {
-
- private static volatile long currentTime = 0;
-
- public static void setCurrentTime(long currentTime) {
- ModifyableClock.currentTime = currentTime;
- }
-
- @Override
- public long currentTimeMillis() {
- return currentTime;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
deleted file mode 100644
index fe60d94..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
- <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
deleted file mode 100644
index b4f5ad2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
+++ /dev/null
@@ -1,174 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-connectors-parent</artifactId>
- <version>0.10-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-flume</artifactId>
- <name>flink-connector-flume</name>
-
- <packaging>jar</packaging>
-
- <!-- Allow users to pass custom connector versions -->
- <properties>
- <flume-ng.version>1.5.0</flume-ng.version>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <version>${flume-ng.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.thoughtworks.paranamer</groupId>
- <artifactId>paranamer</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.tukaani</groupId>
- <artifactId>xz</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.velocity</groupId>
- <artifactId>velocity</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <id>shade-flink</id>
- <configuration>
- <artifactSet>
- <includes combine.children="append">
- <!-- We include all dependencies that transitively depend on guava -->
- <include>org.apache.flume:*</include>
- </includes>
- </artifactSet>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
deleted file mode 100644
index 50f5770..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.flume;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.flume.event.EventBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FlumeSink<IN> extends RichSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
-
- private transient FlinkRpcClientFacade client;
- boolean initDone = false;
- String host;
- int port;
- SerializationSchema<IN, byte[]> schema;
-
- public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
- this.host = host;
- this.port = port;
- this.schema = schema;
- }
-
- /**
- * Receives tuples from the Apache Flink {@link DataStream} and forwards
- * them to Apache Flume.
- *
- * @param value
- * The tuple arriving from the datastream
- */
- @Override
- public void invoke(IN value) {
-
- byte[] data = schema.serialize(value);
- client.sendDataToFlume(data);
-
- }
-
- private class FlinkRpcClientFacade {
- private RpcClient client;
- private String hostname;
- private int port;
-
- /**
- * Initializes the connection to Apache Flume.
- *
- * @param hostname
- * The host
- * @param port
- * The port.
- */
- public void init(String hostname, int port) {
- // Setup the RPC connection
- this.hostname = hostname;
- this.port = port;
- int initCounter = 0;
- while (true) {
- if (initCounter >= 90) {
- throw new RuntimeException("Cannot establish connection with" + port + " at "
- + host);
- }
- try {
- this.client = RpcClientFactory.getDefaultInstance(hostname, port);
- } catch (FlumeException e) {
- // Wait one second if the connection failed before the next
- // try
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Interrupted while trying to connect {} at {}", port, host);
- }
- }
- }
- if (client != null) {
- break;
- }
- initCounter++;
- }
- initDone = true;
- }
-
- /**
- * Sends byte arrays as {@link Event} series to Apache Flume.
- *
- * @param data
- * The byte array to send to Apache FLume
- */
- public void sendDataToFlume(byte[] data) {
- Event event = EventBuilder.withBody(data);
-
- try {
- client.append(event);
-
- } catch (EventDeliveryException e) {
- // clean up and recreate the client
- client.close();
- client = null;
- client = RpcClientFactory.getDefaultInstance(hostname, port);
- }
- }
-
- }
-
- @Override
- public void close() {
- client.client.close();
- }
-
- @Override
- public void open(Configuration config) {
- client = new FlinkRpcClientFacade();
- client.init(host, port);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
deleted file mode 100644
index 8fecd0f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ /dev/null
@@ -1,149 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.streaming.connectors.flume;
-//
-//import java.util.List;
-//
-//import org.apache.flink.streaming.api.datastream.DataStream;
-//import org.apache.flink.streaming.api.functions.source.ConnectorSource;
-//import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-//import org.apache.flink.util.Collector;
-//import org.apache.flume.Context;
-//import org.apache.flume.channel.ChannelProcessor;
-//import org.apache.flume.source.AvroSource;
-//import org.apache.flume.source.avro.AvroFlumeEvent;
-//import org.apache.flume.source.avro.Status;
-//
-//public class FlumeSource<OUT> extends ConnectorSource<OUT> {
-// private static final long serialVersionUID = 1L;
-//
-// String host;
-// String port;
-// volatile boolean finished = false;
-//
-// private volatile boolean isRunning = false;
-//
-// FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema) {
-// super(deserializationSchema);
-// this.host = host;
-// this.port = Integer.toString(port);
-// }
-//
-// public class MyAvroSource extends AvroSource {
-// Collector<OUT> output;
-//
-// /**
-// * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
-// * {@link DataStream}.
-// *
-// * @param avroEvent
-// * The event that should be sent to the dataStream
-// * @return A {@link Status}.OK message if sending the event was
-// * successful.
-// */
-// @Override
-// public Status append(AvroFlumeEvent avroEvent) {
-// collect(avroEvent);
-// return Status.OK;
-// }
-//
-// /**
-// * Sends the AvroFlumeEvents from it's argument list to the Apache Flink
-// * {@link DataStream}.
-// *
-// * @param events
-// * The events that is sent to the dataStream
-// * @return A Status.OK message if sending the events was successful.
-// */
-// @Override
-// public Status appendBatch(List<AvroFlumeEvent> events) {
-// for (AvroFlumeEvent avroEvent : events) {
-// collect(avroEvent);
-// }
-//
-// return Status.OK;
-// }
-//
-// /**
-// * Deserializes the AvroFlumeEvent before sending it to the Apache Flink
-// * {@link DataStream}.
-// *
-// * @param avroEvent
-// * The event that is sent to the dataStream
-// */
-// private void collect(AvroFlumeEvent avroEvent) {
-// byte[] b = avroEvent.getBody().array();
-// OUT out = FlumeSource.this.schema.deserialize(b);
-//
-// if (schema.isEndOfStream(out)) {
-// FlumeSource.this.finished = true;
-// this.stop();
-// FlumeSource.this.notifyAll();
-// } else {
-// output.collect(out);
-// }
-//
-// }
-//
-// }
-//
-// MyAvroSource avroSource;
-//
-// /**
-// * Configures the AvroSource. Also sets the output so the application can
-// * use it from outside of the invoke function.
-// *
-// * @param output
-// * The output used in the invoke function
-// */
-// public void configureAvroSource(Collector<OUT> output) {
-//
-// avroSource = new MyAvroSource();
-// avroSource.output = output;
-// Context context = new Context();
-// context.put("port", port);
-// context.put("bind", host);
-// avroSource.configure(context);
-// // An instance of a ChannelProcessor is required for configuring the
-// // avroSource although it will not be used in this case.
-// ChannelProcessor cp = new ChannelProcessor(null);
-// avroSource.setChannelProcessor(cp);
-// }
-//
-// /**
-// * Configures the AvroSource and runs until the user calls a close function.
-// *
-// * @param output
-// * The Collector for sending data to the datastream
-// */
-// @Override
-// public void run(Collector<OUT> output) throws Exception {
-// isRunning = true;
-// configureAvroSource(output);
-// avroSource.start();
-// while (!finished && isRunning) {
-// this.wait();
-// }
-// }
-//
-// @Override
-// public void cancel() {
-// isRunning = false;
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
deleted file mode 100644
index 45da6eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ /dev/null
@@ -1,49 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.streaming.connectors.flume;
-//
-//import org.apache.flink.streaming.api.datastream.DataStream;
-//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-//import org.apache.flink.streaming.util.serialization.SerializationSchema;
-//import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-//
-//public class FlumeTopology {
-//
-// public static void main(String[] args) throws Exception {
-//
-// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-//
-// @SuppressWarnings("unused")
-// DataStream<String> inputStream1 = env.addSource(
-// new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink(
-// new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
-//
-// env.execute();
-// }
-//
-// public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
-//
-// private static final long serialVersionUID = 1L;
-//
-// @Override
-// public byte[] serialize(String element) {
-// return element.getBytes();
-// }
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
deleted file mode 100644
index f098d9c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
+++ /dev/null
@@ -1,130 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-connectors-parent</artifactId>
- <version>0.10-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-kafka</artifactId>
- <name>flink-connector-kafka</name>
-
- <packaging>jar</packaging>
-
- <!-- Allow users to pass custom connector versions -->
- <properties>
- <kafka.version>0.8.2.0</kafka.version>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- <version>${kafka.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.sf.jopt-simple</groupId>
- <artifactId>jopt-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.yammer.metrics</groupId>
- <artifactId>metrics-annotation</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-test</artifactId>
- <version>${curator.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <configuration>
- <!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
- <forkCount>1</forkCount>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>