You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:28 UTC
[12/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
new file mode 100644
index 0000000..33a2e47
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.transport.LocalTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.node.Node;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase {
+
+ private static final int NUM_ELEMENTS = 20;
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testNodeClient() throws Exception{
+
+ File dataDir = tempFolder.newFolder();
+
+ Node node = nodeBuilder()
+ .settings(ImmutableSettings.settingsBuilder()
+ .put("http.enabled", false)
+ .put("path.data", dataDir.getAbsolutePath()))
+ // set a custom cluster name to verify that user config works correctly
+ .clusterName("my-node-client-cluster")
+ .local(true)
+ .node();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
+
+ Map<String, String> config = Maps.newHashMap();
+ // This instructs the sink to emit after every element, otherwise they would be buffered
+ config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+ config.put("cluster.name", "my-node-client-cluster");
+
+ // connect to our local node
+ config.put("node.local", "true");
+
+ source.addSink(new ElasticsearchSink<>(config, new TestIndexRequestBuilder()));
+
+ env.execute("Elasticsearch Node Client Test");
+
+
+ // verify the results
+ Client client = node.client();
+ for (int i = 0; i < NUM_ELEMENTS; i++) {
+ GetResponse response = client.get(new GetRequest("my-index",
+ "my-type",
+ Integer.toString(i))).actionGet();
+ Assert.assertEquals("message #" + i, response.getSource().get("data"));
+ }
+
+ node.close();
+ }
+
+ @Test
+ public void testTransportClient() throws Exception {
+
+ File dataDir = tempFolder.newFolder();
+
+ Node node = nodeBuilder()
+ .settings(ImmutableSettings.settingsBuilder()
+ .put("http.enabled", false)
+ .put("path.data", dataDir.getAbsolutePath()))
+ // set a custom cluster name to verify that user config works correctly
+ .clusterName("my-node-client-cluster")
+ .local(true)
+ .node();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
+
+ Map<String, String> config = Maps.newHashMap();
+ // This instructs the sink to emit after every element, otherwise they would be buffered
+ config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+ config.put("cluster.name", "my-node-client-cluster");
+
+ // connect to our local node
+ config.put("node.local", "true");
+
+ List<TransportAddress> transports = Lists.newArrayList();
+ transports.add(new LocalTransportAddress("1"));
+
+ source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder()));
+
+ env.execute("Elasticsearch TransportClient Test");
+
+
+ // verify the results
+ Client client = node.client();
+ for (int i = 0; i < NUM_ELEMENTS; i++) {
+ GetResponse response = client.get(new GetRequest("my-index",
+ "my-type",
+ Integer.toString(i))).actionGet();
+ Assert.assertEquals("message #" + i, response.getSource().get("data"));
+ }
+
+ node.close();
+ }
+
+ @Test(expected = JobExecutionException.class)
+ public void testTransportClientFails() throws Exception{
+ // this checks whether the TransportClient fails early when there is no cluster to
+ // connect to. We don't hava such as test for the Node Client version since that
+ // one will block and wait for a cluster to come online
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
+
+ Map<String, String> config = Maps.newHashMap();
+ // This instructs the sink to emit after every element, otherwise they would be buffered
+ config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+ config.put("cluster.name", "my-node-client-cluster");
+
+ // connect to our local node
+ config.put("node.local", "true");
+
+ List<TransportAddress> transports = Lists.newArrayList();
+ transports.add(new LocalTransportAddress("1"));
+
+ source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder()));
+
+ env.execute("Elasticsearch Node Client Test");
+ }
+
+ private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
+ for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+ ctx.collect(Tuple2.of(i, "message #" + i));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
+ private static class TestIndexRequestBuilder implements IndexRequestBuilder<Tuple2<Integer, String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IndexRequest createIndexRequest(Tuple2<Integer, String> element, RuntimeContext ctx) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element.f1);
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .id(element.f0.toString())
+ .source(json);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..dc20726
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/pom.xml b/flink-streaming-connectors/flink-connector-filesystem/pom.xml
new file mode 100644
index 0000000..7c4dc98
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/pom.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-connectors-parent</artifactId>
+ <version>0.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-filesystem</artifactId>
+ <name>flink-connector-filesystem</name>
+
+ <packaging>jar</packaging>
+
+ <!--
+ This is a Hadoop2 only flink module.
+ -->
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${shading-artifact.name}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+ </dependency>
+
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
new file mode 100644
index 0000000..913da97
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * A bucketer is used with a {@link RollingSink}
+ * to put emitted elements into rolling files.
+ *
+ * <p>
+ * The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever
+ * a new element arrives it will ask the {@code Bucketer} if a new bucket should be started and
+ * the old one closed. The {@code Bucketer} can, for example, decide to start new buckets
+ * based on system time.
+ */
+public interface Bucketer extends Serializable {
+
+ /**
+ * Returns {@code true} when a new bucket should be started.
+ *
+ * @param currentBucketPath The bucket {@code Path} that is currently being used.
+ */
+ boolean shouldStartNewBucket(Path basePath, Path currentBucketPath);
+
+ /**
+ * Returns the {@link Path} of a new bucket file.
+ *
+ * @param basePath The base path containing all the buckets.
+ *
+ * @return The complete new {@code Path} of the new bucket. This should include the {@code basePath}
+ * and also the {@code subtaskIndex} tp avoid clashes with parallel sinks.
+ */
+ Path getNextBucketPath(Path basePath);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
new file mode 100644
index 0000000..152c75a
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+/**
+ * A clock that can provide the current time.
+ *
+ * <p>
+ * Normally this would be system time, but for testing a custom {@code Clock} can be provided.
+ */
+public interface Clock {
+
+ /**
+ * Return the current system time in milliseconds.
+ */
+ public long currentTimeMillis();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
new file mode 100644
index 0000000..0be40f5
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * A {@link Bucketer} that assigns to buckets based on current system time.
+ *
+ * <p>
+ * The {@code DateTimeBucketer} will create directories of the following form:
+ * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
+ * that was specified as a base path when creating the
+ * {@link RollingSink}. The {@code dateTimePath}
+ * is determined based on the current system time and the user provided format string.
+ *
+ * <p>
+ * {@link SimpleDateFormat} is used to derive a date string from the current system time and
+ * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
+ * files will have a granularity of hours.
+ *
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ * Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
+ * }</pre>
+ *
+ * This will create for example the following bucket path:
+ * {@code /base/1976-12-31-14/}
+ *
+ */
+public class DateTimeBucketer implements Bucketer {
+
+ private static Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
+
+ // We have this so that we can manually set it for tests.
+ private static Clock clock = new SystemClock();
+
+ private final String formatString;
+
+ private transient SimpleDateFormat dateFormatter;
+
+ /**
+ * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}.
+ */
+ public DateTimeBucketer() {
+ this(DEFAULT_FORMAT_STRING);
+ }
+
+ /**
+ * Creates a new {@code DateTimeBucketer} with the given date/time format string.
+ *
+ * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
+ * the bucket path.
+ */
+ public DateTimeBucketer(String formatString) {
+ this.formatString = formatString;
+
+ this.dateFormatter = new SimpleDateFormat(formatString);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ this.dateFormatter = new SimpleDateFormat(formatString);
+ }
+
+
+ @Override
+ public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
+ String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
+ return !(new Path(basePath, newDateTimeString).equals(currentBucketPath));
+ }
+
+ @Override
+ public Path getNextBucketPath(Path basePath) {
+ String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
+ return new Path(basePath + "/" + newDateTimeString);
+ }
+
+ @Override
+ public String toString() {
+ return "DateTimeBucketer{" +
+ "formatString='" + formatString + '\'' +
+ '}';
+ }
+
+ /**
+ * This sets the internal {@link Clock} implementation. This method should only be used for testing
+ *
+ * @param newClock The new clock to set.
+ */
+ public static void setClock(Clock newClock) {
+ clock = newClock;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
new file mode 100644
index 0000000..1307d11
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@link org.apache.flink.streaming.connectors.fs.Bucketer} that does not perform any
+ * rolling of files. All files are written to the base path.
+ */
+public class NonRollingBucketer implements Bucketer {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
+ return false;
+ }
+
+ @Override
+ public Path getNextBucketPath(Path basePath) {
+ return basePath;
+ }
+
+ @Override
+ public String toString() {
+ return "NonRollingBucketer";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
new file mode 100644
index 0000000..c705767
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -0,0 +1,900 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Sink that emits its input elements to rolling {@link org.apache.hadoop.fs.FileSystem} files. This
+ * is itegrated with the checkpointing mechanism to provide exactly once semantics.
+ *
+ * <p>
+ * When creating the sink a {@code basePath} must be specified. The base directory contains
+ * one directory for every bucket. The bucket directories themselves contain several part files.
+ * These contain the actual written data.
+ *
+ * <p>
+ * The sink uses a {@link Bucketer} to determine the name of bucket directories inside the
+ * base directory. Whenever the {@code Bucketer} returns a different directory name than
+ * it returned before the sink will close the current part files inside that bucket
+ * and start the new bucket directory. The default bucketer is a {@link DateTimeBucketer} with
+ * date format string {@code ""yyyy-MM-dd--HH"}. You can specify a custom {@code Bucketer}
+ * using {@link #setBucketer(Bucketer)}. For example, use
+ * {@link org.apache.flink.streaming.connectors.fs.NonRollingBucketer} if you don't want to have
+ * buckets but still write part files in a fault-tolerant way.
+ *
+ * <p>
+ * The filenames of the part files contain the part prefix, the parallel subtask index of the sink
+ * and a rolling counter, for example {@code "part-1-17"}. Per default the part prefix is
+ * {@code "part"} but this can be
+ * configured using {@link #setPartPrefix(String)}. When a part file becomes bigger
+ * than the batch size the current part file is closed, the part counter is increased and
+ * a new part file is created. The batch size defaults to {@code 384MB}, this can be configured
+ * using {@link #setBatchSize(long)}.
+ *
+ * <p>
+ * Part files can be in one of three states: in-progress, pending or finished. The reason for this
+ * is how the sink works together with the checkpointing mechanism to provide exactly-once semantics
+ * and fault-tolerance. The part file that is currently being written to is in-progress. Once
+ * a part file is closed for writing it becomes pending. When a checkpoint is successful the
+ * currently pending files will be moved to finished. If a failure occurs the pending files
+ * will be deleted to reset state to the last checkpoint. The data in in-progress files will
+ * also have to be rolled back. If the {@code FileSystem} supports the {@code truncate} call
+ * this will be used to reset the file back to a previous state. If not, a special file
+ * with the same name as the part file and the suffix {@code ".valid-length"} will be written
+ * that contains the length up to which the file contains valid data. When reading the file
+ * it must be ensured that it is only read up to that point. The prefixes and suffixes for
+ * the different file states and valid-length files can be configured, for example with
+ * {@link #setPendingSuffix(String)}.
+ *
+ * <p>
+ * Note: If checkpointing is not enabled the pending files will never be moved to the finished state.
+ * In that case, the pending suffix/prefix can be set to {@code ""} to make the sink work
+ * in a non-fault-tolerant way but still provide output without prefixes and suffixes.
+ *
+ * <p>
+ * The part files are written using an instance of {@link Writer}. By default
+ * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result
+ * of {@code toString()} for every element. Separated by newlines. You can configure the writer
+ * using {@link #setWriter(Writer)}. For example,
+ * {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter} can be used to write
+ * Hadoop {@code SequenceFiles}.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ * new RollingSink<Tuple2<IntWritable, Text>>(outPath)
+ * .setWriter(new SequenceFileWriter<IntWritable, Text>())
+ * .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
+ * }</pre>
+ *
+ * This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
+ *
+ * @see org.apache.flink.streaming.connectors.fs.DateTimeBucketer
+ * @see StringWriter
+ * @see SequenceFileWriter
+ *
+ * @param <T> Type of the elements emitted by this sink
+ */
+public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConfigurable, Checkpointed<RollingSink.BucketState>, CheckpointNotifier {
+ private static final long serialVersionUID = 1L;
+
+ private static Logger LOG = LoggerFactory.getLogger(RollingSink.class);
+
+
+ // --------------------------------------------------------------------------------------------
+ // User configuration values
+ // --------------------------------------------------------------------------------------------
+ // These are initialized with some defaults but are meant to be changeable by the user
+
+ /**
+ * The default maximum size of part files.
+ *
+ * 6 times the default block size
+ */
+ private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
+
+ /**
+ * This is used for part files that we are writing to but which where not yet confirmed
+ * by a checkpoint.
+ */
+ private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
+
+ /**
+ * See above, but for prefix
+ */
+ private final String DEFAULT_IN_PROGRESS_PREFIX = "_";
+
+ /**
+ * This is used for part files that we are not writing to but which are not yet confirmed by
+ * checkpoint.
+ */
+ private final String DEFAULT_PENDING_SUFFIX = ".pending";
+
+ /**
+ * See above, but for prefix.
+ */
+ private final String DEFAULT_PENDING_PREFIX = "_";
+
+ /**
+ * When truncate() is not supported on the used FileSystem we instead write a
+ * file along the part file with this ending that contains the length up to which
+ * the part file is valid.
+ */
+ private final String DEFAULT_VALID_SUFFIX = ".valid-length";
+
+ /**
+ * See above, but for prefix.
+ */
+ private final String DEFAULT_VALID_PREFIX = "_";
+
+ /**
+ * The default prefix for part files.
+ */
+ private final String DEFAULT_PART_REFIX = "part";
+
+ /**
+ * The base {@code Path} that stored all rolling bucket directories.
+ */
+ private final String basePath;
+
+ /**
+ * The {@code Bucketer} that is used to determine the path of bucket directories.
+ */
+ private Bucketer bucketer;
+
+ /**
+ * We have a template and call duplicate() for each parallel writer in open() to get the actual
+ * writer that is used for the part files.
+ */
+ private Writer<T> writerTemplate;
+
+ /**
+ * The actual writer that we user for writing the part files.
+ */
+ private Writer<T> writer;
+
+ /**
+ * Maximum size of part files. If files exceed this we close and create a new one in the same
+ * bucket directory.
+ */
+ private long batchSize;
+
+ /**
+ * If this is true we remove any leftover in-progress/pending files when the sink is opened.
+ *
+ * <p>
+ * This should only be set to false if using the sink without checkpoints, to not remove
+ * the files already in the directory.
+ */
+ private boolean cleanupOnOpen = true;
+
+ // These are the actually configured prefixes/suffixes
+ private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
+ private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX;
+
+ private String pendingSuffix = DEFAULT_PENDING_SUFFIX;
+ private String pendingPrefix = DEFAULT_PENDING_PREFIX;
+
+ private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
+ private String validLengthPrefix= DEFAULT_VALID_PREFIX;
+
+ private String partPrefix = DEFAULT_PART_REFIX;
+
+ /**
+ * The part file that we are currently writing to.
+ */
+ private transient Path currentPartPath;
+
+ /**
+ * The bucket directory that we are currently filling.
+ */
+ private transient Path currentBucketDirectory;
+
+ // --------------------------------------------------------------------------------------------
+ // Internal fields (not configurable by user)
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * The {@code FSDataOutputStream} for the current part file.
+ */
+ private transient FSDataOutputStream outStream;
+
+ /**
+ * Our subtask index, retrieved from the {@code RuntimeContext} in {@link #open}.
+ */
+ private transient int subtaskIndex;
+
+ /**
+ * For counting the part files inside a bucket directory. Part files follow the patter
+ * {@code "{part-prefix}-{subtask}-{count}"}. When creating new part files we increase the counter.
+ */
+ private transient int partCounter;
+
+ /**
+ * We use reflection to get the hflush method or use sync as a fallback.
+ * The idea for this and the code comes from the Flume HDFS Sink.
+ */
+ private transient Method refHflushOrSync;
+
+ /**
+ * We use reflection to get the .truncate() method, this is only available starting with
+ * Hadoop 2.7
+ */
+ private transient Method refTruncate;
+
+ /**
+ * The state object that is handled by flink from snapshot/restore. In there we store the
+ * current part file path, the valid length of the in-progress files and pending part files.
+ */
+ private transient BucketState bucketState;
+
+ /**
+ * Creates a new {@code RollingSink} that writes files to the given base directory.
+ *
+ * <p>
+ * This uses a{@link DateTimeBucketer} as bucketer and a {@link StringWriter} has writer.
+ * The maximum bucket size is set to 384 MB.
+ *
+ * @param basePath The directory to which to write the bucket files.
+ */
+ public RollingSink(String basePath) {
+ this.basePath = basePath;
+ this.bucketer = new DateTimeBucketer();
+ this.batchSize = DEFAULT_BATCH_SIZE;
+ this.writerTemplate = new StringWriter<>();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+ if (this.writerTemplate instanceof InputTypeConfigurable) {
+ ((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig);
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+ partCounter = 0;
+
+ this.writer = writerTemplate.duplicate();
+
+ if (bucketState == null) {
+ bucketState = new BucketState();
+ }
+
+ FileSystem fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration());
+ refTruncate = reflectTruncate(fs);
+
+ // delete pending/in-progress files that might be left if we fail while
+ // no checkpoint has yet been done
+ try {
+ if (fs.exists(new Path(basePath)) && cleanupOnOpen) {
+ RemoteIterator<LocatedFileStatus> bucketFiles = fs.listFiles(new Path(basePath), true);
+
+ while (bucketFiles.hasNext()) {
+ LocatedFileStatus file = bucketFiles.next();
+ if (file.getPath().toString().endsWith(pendingSuffix)) {
+ // only delete files that contain our subtask index
+ if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
+ LOG.debug("Deleting leftover pending file {}", file.getPath().toString());
+ fs.delete(file.getPath(), true);
+ }
+ }
+ if (file.getPath().toString().endsWith(inProgressSuffix)) {
+ // only delete files that contain our subtask index
+ if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
+ LOG.debug("Deleting leftover in-progress file {}", file.getPath().toString());
+ fs.delete(file.getPath(), true);
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Error while deleting leftover pending/in-progress files: {}", e);
+ throw new RuntimeException("Error while deleting leftover pending/in-progress files.", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+// boolean interrupted = Thread.interrupted();
+ closeCurrentPartFile();
+
+// if (interrupted) {
+// Thread.currentThread().interrupt();
+// }
+ }
+
+ @Override
+ public void invoke(T value) throws Exception {
+
+ if (shouldRoll()) {
+ openNewPartFile();
+ }
+
+ writer.write(value);
+ }
+
+ /**
+ * Determines whether we should change the bucket file we are writing to.
+ *
+ * <p>
+ * This will roll if no file was created yet, if the file size is larger than the specified size
+ * or if the {@code Bucketer} determines that we should roll.
+ */
+ private boolean shouldRoll() throws IOException {
+ boolean shouldRoll = false;
+ if (outStream == null) {
+ shouldRoll = true;
+ LOG.debug("RollingSink {} starting new initial bucket. ", subtaskIndex);
+ }
+ if (bucketer.shouldStartNewBucket(new Path(basePath), currentBucketDirectory)) {
+ shouldRoll = true;
+ LOG.debug("RollingSink {} starting new bucket because {} said we should. ", subtaskIndex, bucketer);
+ // we will retrieve a new bucket base path in openNewPartFile so reset the part counter
+ partCounter = 0;
+ }
+ if (outStream != null) {
+ long writePosition = outStream.getPos();
+ if (outStream != null && writePosition > batchSize) {
+ shouldRoll = true;
+ LOG.debug(
+ "RollingSink {} starting new bucket because file position {} is above batch size {}.",
+ subtaskIndex,
+ writePosition,
+ batchSize);
+ }
+ }
+ return shouldRoll;
+ }
+
+ /**
+ * Opens a new part file.
+ *
+ * <p>
+ * This closes the old bucket file and retrieves a new bucket path from the {@code Bucketer}.
+ */
+ private void openNewPartFile() throws Exception {
+ closeCurrentPartFile();
+
+ org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+
+ FileSystem fs = new Path(basePath).getFileSystem(conf);
+
+ Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath));
+
+ if (!newBucketDirectory.equals(currentBucketDirectory)) {
+ currentBucketDirectory = newBucketDirectory;
+ try {
+ if (fs.mkdirs(currentBucketDirectory)) {
+ LOG.debug("Created new bucket directory: {}", currentBucketDirectory);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Could not create base path for new rolling file.", e);
+ }
+ }
+
+
+ currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter);
+
+ // This should work since there is only one parallel subtask that tries names with
+ // our subtask id. Otherwise we would run into concurrency issues here.
+ while (fs.exists(currentPartPath) || fs.exists(new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix))) {
+ partCounter++;
+ currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter);
+ }
+
+ // increase, so we don't have to check for this name next time
+ partCounter++;
+
+ LOG.debug("Next part path is {}", currentPartPath.toString());
+
+ Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
+
+
+
+ outStream = fs.create(inProgressPath, false);
+
+ // We do the reflection here since this is the first time that we have a FSDataOutputStream
+ if (refHflushOrSync == null) {
+ refHflushOrSync = reflectHflushOrSync(outStream);
+ }
+
+ writer.open(outStream);
+ }
+
+ /**
+ * Closes the current part file.
+ *
+ * <p>
+ * This moves the current in-progress part file to a pending file and adds it to the list
+ * of pending files in our bucket state.
+ */
+ private void closeCurrentPartFile() throws Exception {
+ if (writer != null) {
+ writer.close();
+ }
+
+ if (outStream != null) {
+ hflushOrSync(outStream);
+ outStream.close();
+ outStream = null;
+ }
+ if (currentPartPath != null) {
+ Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
+ Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix);
+ FileSystem fs = inProgressPath.getFileSystem(new org.apache.hadoop.conf.Configuration());
+ fs.rename(inProgressPath, pendingPath);
+ LOG.debug("Moving in-progress bucket {} to pending file {}",
+ inProgressPath,
+ pendingPath);
+ this.bucketState.pendingFiles.add(currentPartPath.toString());
+ }
+ }
+
+ /**
+ * If hflush is available in this version of HDFS, then this method calls
+ * hflush, else it calls sync.
+ * @param os - The stream to flush/sync
+ * @throws java.io.IOException
+ *
+ * <p>
+ * Note: This code comes from Flume
+ */
+ protected void hflushOrSync(FSDataOutputStream os) throws IOException {
+ try {
+ // At this point the refHflushOrSync cannot be null,
+ // since register method would have thrown if it was.
+ this.refHflushOrSync.invoke(os);
+ } catch (InvocationTargetException e) {
+ String msg = "Error while trying to hflushOrSync!";
+ LOG.error(msg + " " + e.getCause());
+ Throwable cause = e.getCause();
+ if(cause != null && cause instanceof IOException) {
+ throw (IOException)cause;
+ }
+ throw new RuntimeException(msg, e);
+ } catch (Exception e) {
+ String msg = "Error while trying to hflushOrSync!";
+ LOG.error(msg + " " + e);
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ /**
+ * Gets the hflush call using reflection. Fallback to sync if hflush is not available.
+ *
+ * <p>
+ * Note: This code comes from Flume
+ */
+ private Method reflectHflushOrSync(FSDataOutputStream os) {
+ Method m = null;
+ if(os != null) {
+ Class<?> fsDataOutputStreamClass = os.getClass();
+ try {
+ m = fsDataOutputStreamClass.getMethod("hflush");
+ } catch (NoSuchMethodException ex) {
+ LOG.debug("HFlush not found. Will use sync() instead");
+ try {
+ m = fsDataOutputStreamClass.getMethod("sync");
+ } catch (Exception ex1) {
+ String msg = "Neither hflush not sync were found. That seems to be " +
+ "a problem!";
+ LOG.error(msg);
+ throw new RuntimeException(msg, ex1);
+ }
+ }
+ }
+ return m;
+ }
+
+ /**
+ * Gets the truncate() call using reflection.
+ *
+ * <p>
+ * Note: This code comes from Flume
+ */
+ private Method reflectTruncate(FileSystem fs) {
+ Method m = null;
+ if(fs != null) {
+ Class<?> fsClass = fs.getClass();
+ try {
+ m = fsClass.getMethod("truncate", Path.class, long.class);
+ } catch (NoSuchMethodException ex) {
+ LOG.debug("Truncate not found. Will write a file with suffix '{}' " +
+ " and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
+ return null;
+ }
+
+
+ // verify that truncate actually works
+ FSDataOutputStream outputStream;
+ Path testPath = new Path(UUID.randomUUID().toString());
+ try {
+ outputStream = fs.create(testPath);
+ outputStream.writeUTF("hello");
+ outputStream.close();
+ } catch (IOException e) {
+ LOG.error("Could not create file for checking if truncate works.", e);
+ throw new RuntimeException("Could not create file for checking if truncate works.", e);
+ }
+
+
+ try {
+ m.invoke(fs, testPath, 2);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ LOG.debug("Truncate is not supported.", e);
+ m = null;
+ }
+
+ try {
+ fs.delete(testPath, false);
+ } catch (IOException e) {
+ LOG.error("Could not delete truncate test file.", e);
+ throw new RuntimeException("Could not delete truncate test file.", e);
+ }
+ }
+ return m;
+ }
+
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ synchronized (bucketState.pendingFilesPerCheckpoint) {
+ Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
+ Set<Long> checkpointsToRemove = Sets.newHashSet();
+ for (Long pastCheckpointId : pastCheckpointIds) {
+ if (pastCheckpointId <= checkpointId) {
+ LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId);
+ // All the pending files are buckets that have been completed but are waiting to be renamed
+ // to their final name
+ for (String filename : bucketState.pendingFilesPerCheckpoint.get(
+ pastCheckpointId)) {
+ Path finalPath = new Path(filename);
+ Path pendingPath = new Path(finalPath.getParent(),
+ pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
+
+ FileSystem fs = pendingPath.getFileSystem(new org.apache.hadoop.conf.Configuration());
+ fs.rename(pendingPath, finalPath);
+ LOG.debug(
+ "Moving pending file {} to final location after complete checkpoint {}.",
+ pendingPath,
+ pastCheckpointId);
+ }
+ checkpointsToRemove.add(pastCheckpointId);
+ }
+ }
+ for (Long toRemove: checkpointsToRemove) {
+ bucketState.pendingFilesPerCheckpoint.remove(toRemove);
+ }
+ }
+ }
+
+ @Override
+ public BucketState snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ if (writer != null) {
+ writer.flush();
+ }
+ if (outStream != null) {
+ hflushOrSync(outStream);
+ bucketState.currentFile = currentPartPath.toString();
+ bucketState.currentFileValidLength = outStream.getPos();
+ }
+ synchronized (bucketState.pendingFilesPerCheckpoint) {
+ bucketState.pendingFilesPerCheckpoint.put(checkpointId, bucketState.pendingFiles);
+ }
+ bucketState.pendingFiles = Lists.newArrayList();
+ return bucketState;
+ }
+
+ @Override
+ public void restoreState(BucketState state) {
+ bucketState = state;
+ // we can clean all the pending files since they where renamed to final files
+ // after this checkpoint was successfull
+ bucketState.pendingFiles.clear();
+ FileSystem fs = null;
+ try {
+ fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration());
+ } catch (IOException e) {
+ LOG.error("Error while creating FileSystem in checkpoint restore.", e);
+ throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e);
+ }
+ if (bucketState.currentFile != null) {
+ // We were writing to a file when the last checkpoint occured. This file can either
+ // be still in-progress or became a pending file at some point after the checkpoint.
+ // Either way, we have to truncate it back to a valid state (or write a .valid-length)
+ // file that specifies up to which length it is valid and rename it to the final name
+ // before starting a new bucket file.
+ Path partPath = new Path(bucketState.currentFile);
+ try {
+ Path partPendingPath = new Path(partPath.getParent(), pendingPrefix + partPath.getName()).suffix(
+ pendingSuffix);
+ Path partInProgressPath = new Path(partPath.getParent(), inProgressPrefix + partPath.getName()).suffix(inProgressSuffix);
+
+ if (fs.exists(partPendingPath)) {
+ LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
+ // has been moved to pending in the mean time, rename to final location
+ fs.rename(partPendingPath, partPath);
+ } else if (fs.exists(partInProgressPath)) {
+ LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
+ // it was still in progress, rename to final path
+ fs.rename(partInProgressPath, partPath);
+ } else {
+ LOG.error("In-Progress file {} was neither moved to pending nor is still in progress.", bucketState.currentFile);
+ throw new RuntimeException("In-Progress file " + bucketState.currentFile+ " " +
+ "was neither moved to pending nor is still in progress.");
+ }
+
+ refTruncate = reflectTruncate(fs);
+ // truncate it or write a ".valid-length" file to specify up to which point it is valid
+ if (refTruncate != null) {
+ LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength);
+ refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
+ } else {
+ LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
+ Path validLengthFilePath = new Path(partPath.getParent(), validLengthPrefix + partPath.getName()).suffix(validLengthSuffix);
+ FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
+ lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
+ lengthFileOut.close();
+ }
+
+ // invalidate in the state object
+ bucketState.currentFile = null;
+ bucketState.currentFileValidLength = -1;
+ } catch (IOException e) {
+ LOG.error("Error while restoring RollingSink state.", e);
+ throw new RuntimeException("Error while restoring RollingSink state.", e);
+ } catch (InvocationTargetException | IllegalAccessException e) {
+ LOG.error("Cound not invoke truncate.", e);
+ throw new RuntimeException("Could not invoke truncate.", e);
+ }
+ }
+
+ LOG.debug("Clearing pending/in-progress files.");
+
+ // Move files that are confirmed by a checkpoint but did not get moved to final location
+ // because the checkpoint notification did not happen before a failure
+
+ Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
+ LOG.debug("Moving pending files to final location on restore.");
+ for (Long pastCheckpointId : pastCheckpointIds) {
+ // All the pending files are buckets that have been completed but are waiting to be renamed
+ // to their final name
+ for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
+ Path finalPath = new Path(filename);
+ Path pendingPath = new Path(finalPath.getParent(), pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
+
+ try {
+ if (fs.exists(pendingPath)) {
+ LOG.debug(
+ "Moving pending file {} to final location after complete checkpoint {}.",
+ pendingPath,
+ pastCheckpointId);
+ fs.rename(pendingPath, finalPath);
+ }
+ } catch (IOException e) {
+ LOG.error("Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
+ throw new RuntimeException("Error while renaming pending file " + pendingPath+ " to final path " + finalPath, e);
+ }
+ }
+ }
+ bucketState.pendingFiles.clear();
+ synchronized (bucketState.pendingFilesPerCheckpoint) {
+ bucketState.pendingFilesPerCheckpoint.clear();
+ }
+
+ // we need to get this here since open() has not yet been called
+ int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+ // delete pending files
+ try {
+
+ RemoteIterator<LocatedFileStatus> bucketFiles = fs.listFiles(new Path(basePath), true);
+
+ while (bucketFiles.hasNext()) {
+ LocatedFileStatus file = bucketFiles.next();
+ if (file.getPath().toString().endsWith(pendingSuffix)) {
+ // only delete files that contain our subtask index
+ if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
+ fs.delete(file.getPath(), true);
+ }
+ }
+ if (file.getPath().toString().endsWith(inProgressSuffix)) {
+ // only delete files that contain our subtask index
+ if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
+ LOG.debug("Deleting in-progress file {}", file.getPath().toString());
+ fs.delete(file.getPath(), true);
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Error while deleting old pending files: {}", e);
+ throw new RuntimeException("Error while deleting old pending files.", e);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Setters for User configuration values
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Sets the maximum bucket size in bytes.
+ *
+ * <p>
+ * When a bucket part file becomes larger than this size a new bucket part file is started and
+ * the old one is closed. The name of the bucket files depends on the {@link Bucketer}.
+ *
+ * @param batchSize The bucket part file size in bytes.
+ */
+ public RollingSink<T> setBatchSize(long batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ /**
+ * Sets the {@link Bucketer} to use for determining the bucket files to write to.
+ *
+ * @param bucketer The bucketer to use.
+ */
+ public RollingSink<T> setBucketer(Bucketer bucketer) {
+ this.bucketer = bucketer;
+ return this;
+ }
+
+ /**
+ * Sets the {@link Writer} to be used for writing the incoming elements to bucket files.
+ *
+ * @param writer The {@code Writer} to use.
+ */
+ public RollingSink<T> setWriter(Writer<T> writer) {
+ this.writerTemplate = writer;
+ return this;
+ }
+
+ /**
+ * Sets the suffix of in-progress part files. The default is {@code "in-progress"}.
+ */
+ public RollingSink<T> setInProgressSuffix(String inProgressSuffix) {
+ this.inProgressSuffix = inProgressSuffix;
+ return this;
+ }
+
+ /**
+ * Sets the prefix of in-progress part files. The default is {@code "_"}.
+ */
+ public RollingSink<T> setInProgressPrefix(String inProgressPrefix) {
+ this.inProgressPrefix = inProgressPrefix;
+ return this;
+ }
+
+ /**
+ * Sets the suffix of pending part files. The default is {@code ".pending"}.
+ */
+ public RollingSink<T> setPendingSuffix(String pendingSuffix) {
+ this.pendingSuffix = pendingSuffix;
+ return this;
+ }
+
+ /**
+ * Sets the prefix of pending part files. The default is {@code "_"}.
+ */
+ public RollingSink<T> setPendingPrefix(String pendingPrefix) {
+ this.pendingPrefix = pendingPrefix;
+ return this;
+ }
+
+ /**
+ * Sets the suffix of valid-length files. The default is {@code ".valid-length"}.
+ */
+ public RollingSink<T> setValidLengthSuffix(String validLengthSuffix) {
+ this.validLengthSuffix = validLengthSuffix;
+ return this;
+ }
+
+ /**
+ * Sets the prefix of valid-length files. The default is {@code "_"}.
+ */
+ public RollingSink<T> setValidLengthPrefix(String validLengthPrefix) {
+ this.validLengthPrefix = validLengthPrefix;
+ return this;
+ }
+
+ /**
+ * Sets the prefix of part files. The default is {@code "part"}.
+ */
+ public RollingSink<T> setPartPrefix(String partPrefix) {
+ this.partPrefix = partPrefix;
+ return this;
+ }
+
+ /**
+ * Disable cleanup of leftover in-progress/pending files when the sink is opened.
+ *
+ * <p>
+ * This should only be disabled if using the sink without checkpoints, to not remove
+ * the files already in the directory.
+ */
+ public RollingSink<T> disableCleanupOnOpen() {
+ this.cleanupOnOpen = false;
+ return this;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Internal Classes
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * This is used for keeping track of the current in-progress files and files that we mark
+ * for moving from pending to final location after we get a checkpoint-complete notification.
+ */
+ static final class BucketState implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The file that was in-progress when the last checkpoint occured.
+ */
+ String currentFile = null;
+
+ /**
+ * The valid length of the in-progress file at the time of the last checkpoint.
+ */
+ long currentFileValidLength = -1;
+
+ /**
+ * Pending files that accumulated since the last checkpoint.
+ */
+ List<String> pendingFiles = Lists.newArrayList();
+
+ /**
+ * When doing a checkpoint we move the pending files since the last checkpoint to this map
+ * with the id of the checkpoint. When we get the checkpoint-complete notification we move
+ * pending files of completed checkpoints to their final location.
+ */
+ final Map<Long, List<String>> pendingFilesPerCheckpoint = Maps.newHashMap();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
new file mode 100644
index 0000000..928d96e
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+
+/**
+ * A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile SequenceFiles}.
+ * The input to the {@link RollingSink} must
+ * be a {@link org.apache.flink.api.java.tuple.Tuple2} of two Hadopo
+ * {@link org.apache.hadoop.io.Writable Writables}.
+ *
+ * @param <K> The type of the first tuple field.
+ * @param <V> The type of the second tuple field.
+ */
+public class SequenceFileWriter<K extends Writable, V extends Writable> implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
+ private static final long serialVersionUID = 1L;
+
+ private final String compressionCodecName;
+
+ private SequenceFile.CompressionType compressionType;
+
+ private transient FSDataOutputStream outputStream;
+
+ private transient SequenceFile.Writer writer;
+
+ private Class<K> keyClass;
+
+ private Class<V> valueClass;
+
+ /**
+ * Creates a new {@code SequenceFileWriter} that writes sequence files without compression.
+ */
+ public SequenceFileWriter() {
+ this("None", SequenceFile.CompressionType.NONE);
+ }
+
+ /**
+ * Creates a new {@code SequenceFileWriter} that writes sequence with the given
+ * compression codec and compression type.
+ *
+ * @param compressionCodecName Name of a Hadoop Compression Codec.
+ * @param compressionType The compression type to use.
+ */
+ public SequenceFileWriter(String compressionCodecName,
+ SequenceFile.CompressionType compressionType) {
+ this.compressionCodecName = compressionCodecName;
+ this.compressionType = compressionType;
+ }
+
+ @Override
+ public void open(FSDataOutputStream outStream) throws IOException {
+ if (outputStream != null) {
+ throw new IllegalStateException("SequenceFileWriter has already been opened.");
+ }
+ if (keyClass == null) {
+ throw new IllegalStateException("Key Class has not been initialized.");
+ }
+ if (valueClass == null) {
+ throw new IllegalStateException("Value Class has not been initialized.");
+ }
+
+ this.outputStream = outStream;
+
+ CompressionCodec codec = null;
+
+ if (!compressionCodecName.equals("None")) {
+ CompressionCodecFactory codecFactory = new CompressionCodecFactory(new Configuration());
+ codec = codecFactory.getCodecByName(compressionCodecName);
+ if (codec == null) {
+ throw new RuntimeException("Codec " + compressionCodecName + " not found.");
+ }
+ }
+
+ // the non-deprecated constructor syntax is only available in recent hadoop versions...
+ writer = SequenceFile.createWriter(new Configuration(),
+ outStream,
+ keyClass,
+ valueClass,
+ compressionType,
+ codec);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (writer != null) {
+ writer.close();
+ }
+ writer = null;
+ outputStream = null;
+ }
+
+ @Override
+ public void write(Tuple2<K, V> element) throws IOException {
+ if (outputStream == null) {
+ throw new IllegalStateException("SequenceFileWriter has not been opened.");
+ }
+ writer.append(element.f0, element.f1);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+ if (!type.isTupleType()) {
+ throw new IllegalArgumentException("Input TypeInformation is not a tuple type.");
+ }
+
+ TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type;
+
+ if (tupleType.getArity() != 2) {
+ throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type.");
+ }
+
+ TypeInformation<K> keyType = tupleType.getTypeAt(0);
+ TypeInformation<V> valueType = tupleType.getTypeAt(1);
+
+ this.keyClass = keyType.getTypeClass();
+ this.valueClass = valueType.getTypeClass();
+ }
+
+ @Override
+ public Writer<Tuple2<K, V>> duplicate() {
+ SequenceFileWriter<K, V> result = new SequenceFileWriter<>(compressionCodecName, compressionType);
+ result.keyClass = keyClass;
+ result.valueClass = valueClass;
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
new file mode 100644
index 0000000..ad0ab46
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.UnsupportedCharsetException;
+
+/**
+ * A {@link Writer} that uses {@code toString()} on the input elements and writes them to
+ * the output bucket file separated by newline.
+ *
+ * @param <T> The type of the elements that are being written by the sink.
+ */
+public class StringWriter<T> implements Writer<T> {
+ private static final long serialVersionUID = 1L;
+
+ private transient FSDataOutputStream outputStream;
+
+ private String charsetName;
+
+ private transient Charset charset;
+
+ /**
+ * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
+ * strings to bytes.
+ */
+ public StringWriter() {
+ this("UTF-8");
+ }
+
+ /**
+ * Creates a new {@code StringWriter} that uses the given charset to convert
+ * strings to bytes.
+ *
+ * @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}
+ */
+ public StringWriter(String charsetName) {
+ this.charsetName = charsetName;
+ }
+
+ @Override
+ public void open(FSDataOutputStream outStream) throws IOException {
+ if (outputStream != null) {
+ throw new IllegalStateException("StringWriter has already been opened.");
+ }
+ this.outputStream = outStream;
+
+ try {
+ this.charset = Charset.forName(charsetName);
+ }
+ catch (IllegalCharsetNameException e) {
+ throw new IOException("The charset " + charsetName + " is not valid.", e);
+ }
+ catch (UnsupportedCharsetException e) {
+ throw new IOException("The charset " + charsetName + " is not supported.", e);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ outputStream = null;
+ }
+
+ @Override
+ public void write(T element) throws IOException {
+ if (outputStream == null) {
+ throw new IllegalStateException("StringWriter has not been opened.");
+ }
+ outputStream.write(element.toString().getBytes(charset));
+ outputStream.write('\n');
+
+ }
+
+ @Override
+ public Writer<T> duplicate() {
+ return new StringWriter<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
new file mode 100644
index 0000000..2bab8cf
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+/**
+ * A {@link Clock} that uses {@code System.currentTimeMillis()} to determine the system time.
+ */
+public class SystemClock implements Clock {
+ @Override
+ public long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
new file mode 100644
index 0000000..98cad32
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * An implementation of {@code Writer} is used in conjunction with a
+ * {@link RollingSink} to perform the actual
+ * writing to the bucket files.
+ *
+ * @param <T> The type of the elements that are being written by the sink.
+ */
+public interface Writer<T> extends Serializable {
+
+ /**
+ * Initializes the {@code Writer} for a newly opened bucket file.
+ * Any internal per-bucket initialization should be performed here.
+ *
+ * @param outStream The {@link org.apache.hadoop.fs.FSDataOutputStream} for the newly opened file.
+ */
+ void open(FSDataOutputStream outStream) throws IOException;
+
+ /**
+ * Flushes out any internally held data.
+ */
+ void flush()throws IOException ;
+
+ /**
+ * Closes the {@code Writer}. This must not close the {@code FSDataOutputStream} that
+ * was handed in in the {@link #open} method. Only internally held state should be
+ * closed.
+ */
+ void close() throws IOException ;
+
+ /**
+ * Writes one element to the bucket file.
+ */
+ void write(T element)throws IOException;
+
+ /**
+ * Duplicates the {@code Writer}. This is used to get one {@code Writer} for each
+ * parallel instance of the sink.
+ */
+ Writer<T> duplicate();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties b/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
new file mode 100644
index 0000000..fe60d94
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
new file mode 100644
index 0000000..7d127ff
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
@@ -0,0 +1,289 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.fs;
+
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+* Tests for {@link RollingSink}.
+*
+* <p>
+* This test only verifies the exactly once behaviour of the sink. Another test tests the
+* rolling behaviour.
+*
+* <p>
+* This differs from RollingSinkFaultToleranceITCase in that the checkpoint interval is extremely
+* high. This provokes the case that the sink restarts without any checkpoint having been performed.
+* This tests the initial cleanup of pending/in-progress files.
+*/
+public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBase {
+
+ final long NUM_STRINGS = 16_000;
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static MiniDFSCluster hdfsCluster;
+ private static org.apache.hadoop.fs.FileSystem dfs;
+
+ private static String outPath;
+
+
+
+ @BeforeClass
+ public static void createHDFS() throws IOException {
+ Configuration conf = new Configuration();
+
+ File dataDir = tempFolder.newFolder();
+
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+ hdfsCluster = builder.build();
+
+ dfs = hdfsCluster.getFileSystem();
+
+ outPath = "hdfs://"
+ + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ + "/string-non-rolling-out-no-checkpoint";
+ }
+
+ @AfterClass
+ public static void destroyHDFS() {
+ if (hdfsCluster != null) {
+ hdfsCluster.shutdown();
+ }
+ }
+
+
+ @Override
+ public void testProgram(StreamExecutionEnvironment env) {
+ assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+ int PARALLELISM = 6;
+
+ env.enableCheckpointing(Long.MAX_VALUE);
+ env.setParallelism(PARALLELISM);
+ env.disableOperatorChaining();
+
+ DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
+
+ DataStream<String> mapped = stream
+ .map(new OnceFailingIdentityMapper(NUM_STRINGS));
+
+ RollingSink<String> sink = new RollingSink<String>(outPath)
+ .setBucketer(new NonRollingBucketer())
+ .setBatchSize(5000)
+ .setValidLengthPrefix("")
+ .setPendingPrefix("");
+
+ mapped.addSink(sink);
+
+ }
+
+ @Override
+ public void postSubmit() throws Exception {
+ // We read the files and verify that we have read all the strings. If a valid-length
+ // file exists we only read the file to that point. (This test should work with
+ // FileSystems that support truncate() and with others as well.)
+
+ Pattern messageRegex = Pattern.compile("message (\\d*)");
+
+ // Keep a set of the message IDs that we read. The size must equal the read count and
+ // the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some
+ // elements twice.
+ Set<Integer> readNumbers = Sets.newHashSet();
+ int numRead = 0;
+
+ RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(
+ outPath), true);
+
+ while (files.hasNext()) {
+ LocatedFileStatus file = files.next();
+
+ if (!file.getPath().toString().endsWith(".valid-length")) {
+ int validLength = (int) file.getLen();
+ if (dfs.exists(file.getPath().suffix(".valid-length"))) {
+ FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length"));
+ String validLengthString = inStream.readUTF();
+ validLength = Integer.parseInt(validLengthString);
+ System.out.println("VALID LENGTH: " + validLength);
+ }
+ FSDataInputStream inStream = dfs.open(file.getPath());
+ byte[] buffer = new byte[validLength];
+ inStream.readFully(0, buffer, 0, validLength);
+ inStream.close();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+
+ InputStreamReader inStreamReader = new InputStreamReader(bais);
+ BufferedReader br = new BufferedReader(inStreamReader);
+
+ String line = br.readLine();
+ while (line != null) {
+ Matcher matcher = messageRegex.matcher(line);
+ if (matcher.matches()) {
+ numRead++;
+ int messageId = Integer.parseInt(matcher.group(1));
+ readNumbers.add(messageId);
+ } else {
+ Assert.fail("Read line does not match expected pattern.");
+ }
+ line = br.readLine();
+ }
+ br.close();
+ inStreamReader.close();
+ bais.close();
+ }
+ }
+
+ // Verify that we read all strings (at-least-once)
+ Assert.assertEquals(NUM_STRINGS, readNumbers.size());
+
+ // Verify that we don't have duplicates (boom!, exactly-once)
+ Assert.assertEquals(NUM_STRINGS, numRead);
+ }
+
+ private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> {
+ private static final long serialVersionUID = 1L;
+
+ private static volatile boolean hasFailed = false;
+
+ private final long numElements;
+
+ private long failurePos;
+ private long count;
+
+
+ OnceFailingIdentityMapper(long numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void open(org.apache.flink.configuration.Configuration parameters) throws IOException {
+ long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+ long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+ failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+ count = 0;
+ }
+
+ @Override
+ public String map(String value) throws Exception {
+ count++;
+ if (!hasFailed && count >= failurePos) {
+ hasFailed = true;
+ throw new Exception("Test Failure");
+ }
+
+ return value;
+ }
+ }
+
+ private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+ implements CheckpointedAsynchronously<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final long numElements;
+
+ private int index;
+
+ private volatile boolean isRunning = true;
+
+
+ StringGeneratingSourceFunction(long numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ final Object lockingObject = ctx.getCheckpointLock();
+
+ final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+
+ if (index == 0) {
+ index = getRuntimeContext().getIndexOfThisSubtask();
+ }
+
+ while (isRunning && index < numElements) {
+
+ Thread.sleep(1);
+ synchronized (lockingObject) {
+ ctx.collect("message " + index);
+ index += step;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ private static String randomString(StringBuilder bld, Random rnd) {
+ final int len = rnd.nextInt(10) + 5;
+
+ for (int i = 0; i < len; i++) {
+ char next = (char) (rnd.nextInt(20000) + 33);
+ bld.append(next);
+ }
+
+ return bld.toString();
+ }
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return index;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ index = state;
+ }
+ }
+}