You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:04:06 UTC
[50/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
deleted file mode 100644
index 04ae40a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.elasticsearch.action.index.IndexRequest;
-
-import java.io.Serializable;
-
-/**
- * Function that creates an {@link IndexRequest} from an element in a Stream.
- *
- * <p>
- * This is used by {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink}
- * to prepare elements for sending them to Elasticsearch. See
- * <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index_.html">Index API</a>
- * for information about how to format data for adding it to an Elasticsearch index.
- *
- * <p>
- * Example:
- *
- * <pre>{@code
- * private static class MyIndexRequestBuilder implements IndexRequestBuilder<String> {
- *
- * public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
- * Map<String, Object> json = new HashMap<>();
- * json.put("data", element);
- *
- * return Requests.indexRequest()
- * .index("my-index")
- * .type("my-type")
- * .source(json);
- * }
- * }
- * }</pre>
- *
- * @param <T> The type of the element handled by this {@code IndexRequestBuilder}
- */
-public interface IndexRequestBuilder<T> extends Function, Serializable {
-
- /**
- * Creates an {@link org.elasticsearch.action.index.IndexRequest} from an element.
- *
- * @param element The element that needs to be turned in to an {@code IndexRequest}
- * @param ctx The Flink {@link RuntimeContext} of the {@link ElasticsearchSink}
- *
- * @return The constructed {@code IndexRequest}
- */
- IndexRequest createIndexRequest(T element, RuntimeContext ctx);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
deleted file mode 100644
index 298eb64..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch.examples;
-
-import com.google.common.collect.Maps;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster names "elasticsearch" running or change the cluster name in the config map.
- */
-public class ElasticsearchExample {
-
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- for (int i = 0; i < 20 && running; i++) {
- ctx.collect("message #" + i);
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- Map<String, String> config = Maps.newHashMap();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
- source.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
- @Override
- public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json);
- }
- }));
-
-
- env.execute("Elasticsearch Example");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
deleted file mode 100644
index 33a2e47..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.transport.LocalTransportAddress;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.node.Node;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
-
-public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase {
-
- private static final int NUM_ELEMENTS = 20;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Test
- public void testNodeClient() throws Exception{
-
- File dataDir = tempFolder.newFolder();
-
- Node node = nodeBuilder()
- .settings(ImmutableSettings.settingsBuilder()
- .put("http.enabled", false)
- .put("path.data", dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that user config works correctly
- .clusterName("my-node-client-cluster")
- .local(true)
- .node();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
- Map<String, String> config = Maps.newHashMap();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-node-client-cluster");
-
- // connect to our local node
- config.put("node.local", "true");
-
- source.addSink(new ElasticsearchSink<>(config, new TestIndexRequestBuilder()));
-
- env.execute("Elasticsearch Node Client Test");
-
-
- // verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new GetRequest("my-index",
- "my-type",
- Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i, response.getSource().get("data"));
- }
-
- node.close();
- }
-
- @Test
- public void testTransportClient() throws Exception {
-
- File dataDir = tempFolder.newFolder();
-
- Node node = nodeBuilder()
- .settings(ImmutableSettings.settingsBuilder()
- .put("http.enabled", false)
- .put("path.data", dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that user config works correctly
- .clusterName("my-node-client-cluster")
- .local(true)
- .node();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
- Map<String, String> config = Maps.newHashMap();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-node-client-cluster");
-
- // connect to our local node
- config.put("node.local", "true");
-
- List<TransportAddress> transports = Lists.newArrayList();
- transports.add(new LocalTransportAddress("1"));
-
- source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder()));
-
- env.execute("Elasticsearch TransportClient Test");
-
-
- // verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new GetRequest("my-index",
- "my-type",
- Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i, response.getSource().get("data"));
- }
-
- node.close();
- }
-
- @Test(expected = JobExecutionException.class)
- public void testTransportClientFails() throws Exception{
- // this checks whether the TransportClient fails early when there is no cluster to
- // connect to. We don't hava such as test for the Node Client version since that
- // one will block and wait for a cluster to come online
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
- Map<String, String> config = Maps.newHashMap();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-node-client-cluster");
-
- // connect to our local node
- config.put("node.local", "true");
-
- List<TransportAddress> transports = Lists.newArrayList();
- transports.add(new LocalTransportAddress("1"));
-
- source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder()));
-
- env.execute("Elasticsearch Node Client Test");
- }
-
- private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
- for (int i = 0; i < NUM_ELEMENTS && running; i++) {
- ctx.collect(Tuple2.of(i, "message #" + i));
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
-
- private static class TestIndexRequestBuilder implements IndexRequestBuilder<Tuple2<Integer, String>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public IndexRequest createIndexRequest(Tuple2<Integer, String> element, RuntimeContext ctx) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element.f1);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .id(element.f0.toString())
- .source(json);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
deleted file mode 100644
index dc20726..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
- <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/pom.xml
deleted file mode 100644
index e0319b1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/pom.xml
+++ /dev/null
@@ -1,112 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-connectors-parent</artifactId>
- <version>0.10-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-filesystem</artifactId>
- <name>flink-connector-filesystem</name>
-
- <packaging>jar</packaging>
-
- <!--
- This is a Hadoop2 only flink module.
- -->
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>${shading-artifact.name}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
- </dependency>
-
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
deleted file mode 100644
index 913da97..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.hadoop.fs.Path;
-
-import java.io.Serializable;
-
-/**
- * A bucketer is used with a {@link RollingSink}
- * to put emitted elements into rolling files.
- *
- * <p>
- * The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever
- * a new element arrives it will ask the {@code Bucketer} if a new bucket should be started and
- * the old one closed. The {@code Bucketer} can, for example, decide to start new buckets
- * based on system time.
- */
-public interface Bucketer extends Serializable {
-
- /**
- * Returns {@code true} when a new bucket should be started.
- *
- * @param currentBucketPath The bucket {@code Path} that is currently being used.
- */
- boolean shouldStartNewBucket(Path basePath, Path currentBucketPath);
-
- /**
- * Returns the {@link Path} of a new bucket file.
- *
- * @param basePath The base path containing all the buckets.
- *
- * @return The complete new {@code Path} of the new bucket. This should include the {@code basePath}
- * and also the {@code subtaskIndex} tp avoid clashes with parallel sinks.
- */
- Path getNextBucketPath(Path basePath);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
deleted file mode 100644
index 152c75a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-/**
- * A clock that can provide the current time.
- *
- * <p>
- * Normally this would be system time, but for testing a custom {@code Clock} can be provided.
- */
-public interface Clock {
-
- /**
- * Return the current system time in milliseconds.
- */
- public long currentTimeMillis();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
deleted file mode 100644
index 0be40f5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-/**
- * A {@link Bucketer} that assigns to buckets based on current system time.
- *
- * <p>
- * The {@code DateTimeBucketer} will create directories of the following form:
- * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
- * that was specified as a base path when creating the
- * {@link RollingSink}. The {@code dateTimePath}
- * is determined based on the current system time and the user provided format string.
- *
- * <p>
- * {@link SimpleDateFormat} is used to derive a date string from the current system time and
- * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
- * files will have a granularity of hours.
- *
- *
- * <p>
- * Example:
- *
- * <pre>{@code
- * Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
- * }</pre>
- *
- * This will create for example the following bucket path:
- * {@code /base/1976-12-31-14/}
- *
- */
-public class DateTimeBucketer implements Bucketer {
-
- private static Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class);
-
- private static final long serialVersionUID = 1L;
-
- private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
-
- // We have this so that we can manually set it for tests.
- private static Clock clock = new SystemClock();
-
- private final String formatString;
-
- private transient SimpleDateFormat dateFormatter;
-
- /**
- * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}.
- */
- public DateTimeBucketer() {
- this(DEFAULT_FORMAT_STRING);
- }
-
- /**
- * Creates a new {@code DateTimeBucketer} with the given date/time format string.
- *
- * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
- * the bucket path.
- */
- public DateTimeBucketer(String formatString) {
- this.formatString = formatString;
-
- this.dateFormatter = new SimpleDateFormat(formatString);
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
-
- this.dateFormatter = new SimpleDateFormat(formatString);
- }
-
-
- @Override
- public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
- String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
- return !(new Path(basePath, newDateTimeString).equals(currentBucketPath));
- }
-
- @Override
- public Path getNextBucketPath(Path basePath) {
- String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
- return new Path(basePath + "/" + newDateTimeString);
- }
-
- @Override
- public String toString() {
- return "DateTimeBucketer{" +
- "formatString='" + formatString + '\'' +
- '}';
- }
-
- /**
- * This sets the internal {@link Clock} implementation. This method should only be used for testing
- *
- * @param newClock The new clock to set.
- */
- public static void setClock(Clock newClock) {
- clock = newClock;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
deleted file mode 100644
index 1307d11..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.hadoop.fs.Path;
-
-/**
- * A {@link org.apache.flink.streaming.connectors.fs.Bucketer} that does not perform any
- * rolling of files. All files are written to the base path.
- */
-public class NonRollingBucketer implements Bucketer {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
- return false;
- }
-
- @Override
- public Path getNextBucketPath(Path basePath) {
- return basePath;
- }
-
- @Override
- public String toString() {
- return "NonRollingBucketer";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
deleted file mode 100644
index c705767..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ /dev/null
@@ -1,900 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-/**
- * Sink that emits its input elements to rolling {@link org.apache.hadoop.fs.FileSystem} files. This
- * is itegrated with the checkpointing mechanism to provide exactly once semantics.
- *
- * <p>
- * When creating the sink a {@code basePath} must be specified. The base directory contains
- * one directory for every bucket. The bucket directories themselves contain several part files.
- * These contain the actual written data.
- *
- * <p>
- * The sink uses a {@link Bucketer} to determine the name of bucket directories inside the
- * base directory. Whenever the {@code Bucketer} returns a different directory name than
- * it returned before the sink will close the current part files inside that bucket
- * and start the new bucket directory. The default bucketer is a {@link DateTimeBucketer} with
- * date format string {@code ""yyyy-MM-dd--HH"}. You can specify a custom {@code Bucketer}
- * using {@link #setBucketer(Bucketer)}. For example, use
- * {@link org.apache.flink.streaming.connectors.fs.NonRollingBucketer} if you don't want to have
- * buckets but still write part files in a fault-tolerant way.
- *
- * <p>
- * The filenames of the part files contain the part prefix, the parallel subtask index of the sink
- * and a rolling counter, for example {@code "part-1-17"}. Per default the part prefix is
- * {@code "part"} but this can be
- * configured using {@link #setPartPrefix(String)}. When a part file becomes bigger
- * than the batch size the current part file is closed, the part counter is increased and
- * a new part file is created. The batch size defaults to {@code 384MB}, this can be configured
- * using {@link #setBatchSize(long)}.
- *
- * <p>
- * Part files can be in one of three states: in-progress, pending or finished. The reason for this
- * is how the sink works together with the checkpointing mechanism to provide exactly-once semantics
- * and fault-tolerance. The part file that is currently being written to is in-progress. Once
- * a part file is closed for writing it becomes pending. When a checkpoint is successful the
- * currently pending files will be moved to finished. If a failure occurs the pending files
- * will be deleted to reset state to the last checkpoint. The data in in-progress files will
- * also have to be rolled back. If the {@code FileSystem} supports the {@code truncate} call
- * this will be used to reset the file back to a previous state. If not, a special file
- * with the same name as the part file and the suffix {@code ".valid-length"} will be written
- * that contains the length up to which the file contains valid data. When reading the file
- * it must be ensured that it is only read up to that point. The prefixes and suffixes for
- * the different file states and valid-length files can be configured, for example with
- * {@link #setPendingSuffix(String)}.
- *
- * <p>
- * Note: If checkpointing is not enabled the pending files will never be moved to the finished state.
- * In that case, the pending suffix/prefix can be set to {@code ""} to make the sink work
- * in a non-fault-tolerant way but still provide output without prefixes and suffixes.
- *
- * <p>
- * The part files are written using an instance of {@link Writer}. By default
- * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result
- * of {@code toString()} for every element. Separated by newlines. You can configure the writer
- * using {@link #setWriter(Writer)}. For example,
- * {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter} can be used to write
- * Hadoop {@code SequenceFiles}.
- *
- * <p>
- * Example:
- *
- * <pre>{@code
- * new RollingSink<Tuple2<IntWritable, Text>>(outPath)
- * .setWriter(new SequenceFileWriter<IntWritable, Text>())
- * .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
- * }</pre>
- *
- * This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
- *
- * @see org.apache.flink.streaming.connectors.fs.DateTimeBucketer
- * @see StringWriter
- * @see SequenceFileWriter
- *
- * @param <T> Type of the elements emitted by this sink
- */
-public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConfigurable, Checkpointed<RollingSink.BucketState>, CheckpointNotifier {
- private static final long serialVersionUID = 1L;
-
- private static Logger LOG = LoggerFactory.getLogger(RollingSink.class);
-
-
- // --------------------------------------------------------------------------------------------
- // User configuration values
- // --------------------------------------------------------------------------------------------
- // These are initialized with some defaults but are meant to be changeable by the user
-
- /**
- * The default maximum size of part files.
- *
- * 6 times the default block size
- */
- private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
-
- /**
- * This is used for part files that we are writing to but which where not yet confirmed
- * by a checkpoint.
- */
- private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
-
- /**
- * See above, but for prefix
- */
- private final String DEFAULT_IN_PROGRESS_PREFIX = "_";
-
- /**
- * This is used for part files that we are not writing to but which are not yet confirmed by
- * checkpoint.
- */
- private final String DEFAULT_PENDING_SUFFIX = ".pending";
-
- /**
- * See above, but for prefix.
- */
- private final String DEFAULT_PENDING_PREFIX = "_";
-
- /**
- * When truncate() is not supported on the used FileSystem we instead write a
- * file along the part file with this ending that contains the length up to which
- * the part file is valid.
- */
- private final String DEFAULT_VALID_SUFFIX = ".valid-length";
-
- /**
- * See above, but for prefix.
- */
- private final String DEFAULT_VALID_PREFIX = "_";
-
- /**
- * The default prefix for part files.
- */
- private final String DEFAULT_PART_REFIX = "part";
-
- /**
- * The base {@code Path} that stored all rolling bucket directories.
- */
- private final String basePath;
-
- /**
- * The {@code Bucketer} that is used to determine the path of bucket directories.
- */
- private Bucketer bucketer;
-
- /**
- * We have a template and call duplicate() for each parallel writer in open() to get the actual
- * writer that is used for the part files.
- */
- private Writer<T> writerTemplate;
-
- /**
- * The actual writer that we user for writing the part files.
- */
- private Writer<T> writer;
-
- /**
- * Maximum size of part files. If files exceed this we close and create a new one in the same
- * bucket directory.
- */
- private long batchSize;
-
- /**
- * If this is true we remove any leftover in-progress/pending files when the sink is opened.
- *
- * <p>
- * This should only be set to false if using the sink without checkpoints, to not remove
- * the files already in the directory.
- */
- private boolean cleanupOnOpen = true;
-
- // These are the actually configured prefixes/suffixes
- private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
- private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX;
-
- private String pendingSuffix = DEFAULT_PENDING_SUFFIX;
- private String pendingPrefix = DEFAULT_PENDING_PREFIX;
-
- private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
- private String validLengthPrefix= DEFAULT_VALID_PREFIX;
-
- private String partPrefix = DEFAULT_PART_REFIX;
-
- /**
- * The part file that we are currently writing to.
- */
- private transient Path currentPartPath;
-
- /**
- * The bucket directory that we are currently filling.
- */
- private transient Path currentBucketDirectory;
-
- // --------------------------------------------------------------------------------------------
- // Internal fields (not configurable by user)
- // --------------------------------------------------------------------------------------------
-
- /**
- * The {@code FSDataOutputStream} for the current part file.
- */
- private transient FSDataOutputStream outStream;
-
- /**
- * Our subtask index, retrieved from the {@code RuntimeContext} in {@link #open}.
- */
- private transient int subtaskIndex;
-
- /**
- * For counting the part files inside a bucket directory. Part files follow the patter
- * {@code "{part-prefix}-{subtask}-{count}"}. When creating new part files we increase the counter.
- */
- private transient int partCounter;
-
- /**
- * We use reflection to get the hflush method or use sync as a fallback.
- * The idea for this and the code comes from the Flume HDFS Sink.
- */
- private transient Method refHflushOrSync;
-
- /**
- * We use reflection to get the .truncate() method, this is only available starting with
- * Hadoop 2.7
- */
- private transient Method refTruncate;
-
- /**
- * The state object that is handled by flink from snapshot/restore. In there we store the
- * current part file path, the valid length of the in-progress files and pending part files.
- */
- private transient BucketState bucketState;
-
- /**
- * Creates a new {@code RollingSink} that writes files to the given base directory.
- *
- * <p>
- * This uses a{@link DateTimeBucketer} as bucketer and a {@link StringWriter} has writer.
- * The maximum bucket size is set to 384 MB.
- *
- * @param basePath The directory to which to write the bucket files.
- */
- public RollingSink(String basePath) {
- this.basePath = basePath;
- this.bucketer = new DateTimeBucketer();
- this.batchSize = DEFAULT_BATCH_SIZE;
- this.writerTemplate = new StringWriter<>();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
- if (this.writerTemplate instanceof InputTypeConfigurable) {
- ((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig);
- }
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
- partCounter = 0;
-
- this.writer = writerTemplate.duplicate();
-
- if (bucketState == null) {
- bucketState = new BucketState();
- }
-
- FileSystem fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration());
- refTruncate = reflectTruncate(fs);
-
- // delete pending/in-progress files that might be left if we fail while
- // no checkpoint has yet been done
- try {
- if (fs.exists(new Path(basePath)) && cleanupOnOpen) {
- RemoteIterator<LocatedFileStatus> bucketFiles = fs.listFiles(new Path(basePath), true);
-
- while (bucketFiles.hasNext()) {
- LocatedFileStatus file = bucketFiles.next();
- if (file.getPath().toString().endsWith(pendingSuffix)) {
- // only delete files that contain our subtask index
- if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
- LOG.debug("Deleting leftover pending file {}", file.getPath().toString());
- fs.delete(file.getPath(), true);
- }
- }
- if (file.getPath().toString().endsWith(inProgressSuffix)) {
- // only delete files that contain our subtask index
- if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
- LOG.debug("Deleting leftover in-progress file {}", file.getPath().toString());
- fs.delete(file.getPath(), true);
- }
- }
- }
- }
- } catch (IOException e) {
- LOG.error("Error while deleting leftover pending/in-progress files: {}", e);
- throw new RuntimeException("Error while deleting leftover pending/in-progress files.", e);
- }
- }
-
- @Override
- public void close() throws Exception {
-// boolean interrupted = Thread.interrupted();
- closeCurrentPartFile();
-
-// if (interrupted) {
-// Thread.currentThread().interrupt();
-// }
- }
-
- @Override
- public void invoke(T value) throws Exception {
-
- if (shouldRoll()) {
- openNewPartFile();
- }
-
- writer.write(value);
- }
-
- /**
- * Determines whether we should change the bucket file we are writing to.
- *
- * <p>
- * This will roll if no file was created yet, if the file size is larger than the specified size
- * or if the {@code Bucketer} determines that we should roll.
- */
- private boolean shouldRoll() throws IOException {
- boolean shouldRoll = false;
- if (outStream == null) {
- shouldRoll = true;
- LOG.debug("RollingSink {} starting new initial bucket. ", subtaskIndex);
- }
- if (bucketer.shouldStartNewBucket(new Path(basePath), currentBucketDirectory)) {
- shouldRoll = true;
- LOG.debug("RollingSink {} starting new bucket because {} said we should. ", subtaskIndex, bucketer);
- // we will retrieve a new bucket base path in openNewPartFile so reset the part counter
- partCounter = 0;
- }
- if (outStream != null) {
- long writePosition = outStream.getPos();
- if (outStream != null && writePosition > batchSize) {
- shouldRoll = true;
- LOG.debug(
- "RollingSink {} starting new bucket because file position {} is above batch size {}.",
- subtaskIndex,
- writePosition,
- batchSize);
- }
- }
- return shouldRoll;
- }
-
- /**
- * Opens a new part file.
- *
- * <p>
- * This closes the old bucket file and retrieves a new bucket path from the {@code Bucketer}.
- */
- private void openNewPartFile() throws Exception {
- closeCurrentPartFile();
-
- org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-
- FileSystem fs = new Path(basePath).getFileSystem(conf);
-
- Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath));
-
- if (!newBucketDirectory.equals(currentBucketDirectory)) {
- currentBucketDirectory = newBucketDirectory;
- try {
- if (fs.mkdirs(currentBucketDirectory)) {
- LOG.debug("Created new bucket directory: {}", currentBucketDirectory);
- }
- } catch (IOException e) {
- throw new RuntimeException("Could not create base path for new rolling file.", e);
- }
- }
-
-
- currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter);
-
- // This should work since there is only one parallel subtask that tries names with
- // our subtask id. Otherwise we would run into concurrency issues here.
- while (fs.exists(currentPartPath) || fs.exists(new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix))) {
- partCounter++;
- currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter);
- }
-
- // increase, so we don't have to check for this name next time
- partCounter++;
-
- LOG.debug("Next part path is {}", currentPartPath.toString());
-
- Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
-
-
-
- outStream = fs.create(inProgressPath, false);
-
- // We do the reflection here since this is the first time that we have a FSDataOutputStream
- if (refHflushOrSync == null) {
- refHflushOrSync = reflectHflushOrSync(outStream);
- }
-
- writer.open(outStream);
- }
-
- /**
- * Closes the current part file.
- *
- * <p>
- * This moves the current in-progress part file to a pending file and adds it to the list
- * of pending files in our bucket state.
- */
- private void closeCurrentPartFile() throws Exception {
- if (writer != null) {
- writer.close();
- }
-
- if (outStream != null) {
- hflushOrSync(outStream);
- outStream.close();
- outStream = null;
- }
- if (currentPartPath != null) {
- Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
- Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix);
- FileSystem fs = inProgressPath.getFileSystem(new org.apache.hadoop.conf.Configuration());
- fs.rename(inProgressPath, pendingPath);
- LOG.debug("Moving in-progress bucket {} to pending file {}",
- inProgressPath,
- pendingPath);
- this.bucketState.pendingFiles.add(currentPartPath.toString());
- }
- }
-
- /**
- * If hflush is available in this version of HDFS, then this method calls
- * hflush, else it calls sync.
- * @param os - The stream to flush/sync
- * @throws java.io.IOException
- *
- * <p>
- * Note: This code comes from Flume
- */
- protected void hflushOrSync(FSDataOutputStream os) throws IOException {
- try {
- // At this point the refHflushOrSync cannot be null,
- // since register method would have thrown if it was.
- this.refHflushOrSync.invoke(os);
- } catch (InvocationTargetException e) {
- String msg = "Error while trying to hflushOrSync!";
- LOG.error(msg + " " + e.getCause());
- Throwable cause = e.getCause();
- if(cause != null && cause instanceof IOException) {
- throw (IOException)cause;
- }
- throw new RuntimeException(msg, e);
- } catch (Exception e) {
- String msg = "Error while trying to hflushOrSync!";
- LOG.error(msg + " " + e);
- throw new RuntimeException(msg, e);
- }
- }
-
- /**
- * Gets the hflush call using reflection. Fallback to sync if hflush is not available.
- *
- * <p>
- * Note: This code comes from Flume
- */
- private Method reflectHflushOrSync(FSDataOutputStream os) {
- Method m = null;
- if(os != null) {
- Class<?> fsDataOutputStreamClass = os.getClass();
- try {
- m = fsDataOutputStreamClass.getMethod("hflush");
- } catch (NoSuchMethodException ex) {
- LOG.debug("HFlush not found. Will use sync() instead");
- try {
- m = fsDataOutputStreamClass.getMethod("sync");
- } catch (Exception ex1) {
- String msg = "Neither hflush not sync were found. That seems to be " +
- "a problem!";
- LOG.error(msg);
- throw new RuntimeException(msg, ex1);
- }
- }
- }
- return m;
- }
-
- /**
- * Gets the truncate() call using reflection.
- *
- * <p>
- * Note: This code comes from Flume
- */
- private Method reflectTruncate(FileSystem fs) {
- Method m = null;
- if(fs != null) {
- Class<?> fsClass = fs.getClass();
- try {
- m = fsClass.getMethod("truncate", Path.class, long.class);
- } catch (NoSuchMethodException ex) {
- LOG.debug("Truncate not found. Will write a file with suffix '{}' " +
- " and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
- return null;
- }
-
-
- // verify that truncate actually works
- FSDataOutputStream outputStream;
- Path testPath = new Path(UUID.randomUUID().toString());
- try {
- outputStream = fs.create(testPath);
- outputStream.writeUTF("hello");
- outputStream.close();
- } catch (IOException e) {
- LOG.error("Could not create file for checking if truncate works.", e);
- throw new RuntimeException("Could not create file for checking if truncate works.", e);
- }
-
-
- try {
- m.invoke(fs, testPath, 2);
- } catch (IllegalAccessException | InvocationTargetException e) {
- LOG.debug("Truncate is not supported.", e);
- m = null;
- }
-
- try {
- fs.delete(testPath, false);
- } catch (IOException e) {
- LOG.error("Could not delete truncate test file.", e);
- throw new RuntimeException("Could not delete truncate test file.", e);
- }
- }
- return m;
- }
-
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- synchronized (bucketState.pendingFilesPerCheckpoint) {
- Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
- Set<Long> checkpointsToRemove = Sets.newHashSet();
- for (Long pastCheckpointId : pastCheckpointIds) {
- if (pastCheckpointId <= checkpointId) {
- LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId);
- // All the pending files are buckets that have been completed but are waiting to be renamed
- // to their final name
- for (String filename : bucketState.pendingFilesPerCheckpoint.get(
- pastCheckpointId)) {
- Path finalPath = new Path(filename);
- Path pendingPath = new Path(finalPath.getParent(),
- pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
-
- FileSystem fs = pendingPath.getFileSystem(new org.apache.hadoop.conf.Configuration());
- fs.rename(pendingPath, finalPath);
- LOG.debug(
- "Moving pending file {} to final location after complete checkpoint {}.",
- pendingPath,
- pastCheckpointId);
- }
- checkpointsToRemove.add(pastCheckpointId);
- }
- }
- for (Long toRemove: checkpointsToRemove) {
- bucketState.pendingFilesPerCheckpoint.remove(toRemove);
- }
- }
- }
-
- @Override
- public BucketState snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- if (writer != null) {
- writer.flush();
- }
- if (outStream != null) {
- hflushOrSync(outStream);
- bucketState.currentFile = currentPartPath.toString();
- bucketState.currentFileValidLength = outStream.getPos();
- }
- synchronized (bucketState.pendingFilesPerCheckpoint) {
- bucketState.pendingFilesPerCheckpoint.put(checkpointId, bucketState.pendingFiles);
- }
- bucketState.pendingFiles = Lists.newArrayList();
- return bucketState;
- }
-
- @Override
- public void restoreState(BucketState state) {
- bucketState = state;
- // we can clean all the pending files since they where renamed to final files
- // after this checkpoint was successfull
- bucketState.pendingFiles.clear();
- FileSystem fs = null;
- try {
- fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration());
- } catch (IOException e) {
- LOG.error("Error while creating FileSystem in checkpoint restore.", e);
- throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e);
- }
- if (bucketState.currentFile != null) {
- // We were writing to a file when the last checkpoint occured. This file can either
- // be still in-progress or became a pending file at some point after the checkpoint.
- // Either way, we have to truncate it back to a valid state (or write a .valid-length)
- // file that specifies up to which length it is valid and rename it to the final name
- // before starting a new bucket file.
- Path partPath = new Path(bucketState.currentFile);
- try {
- Path partPendingPath = new Path(partPath.getParent(), pendingPrefix + partPath.getName()).suffix(
- pendingSuffix);
- Path partInProgressPath = new Path(partPath.getParent(), inProgressPrefix + partPath.getName()).suffix(inProgressSuffix);
-
- if (fs.exists(partPendingPath)) {
- LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
- // has been moved to pending in the mean time, rename to final location
- fs.rename(partPendingPath, partPath);
- } else if (fs.exists(partInProgressPath)) {
- LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
- // it was still in progress, rename to final path
- fs.rename(partInProgressPath, partPath);
- } else {
- LOG.error("In-Progress file {} was neither moved to pending nor is still in progress.", bucketState.currentFile);
- throw new RuntimeException("In-Progress file " + bucketState.currentFile+ " " +
- "was neither moved to pending nor is still in progress.");
- }
-
- refTruncate = reflectTruncate(fs);
- // truncate it or write a ".valid-length" file to specify up to which point it is valid
- if (refTruncate != null) {
- LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength);
- refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
- } else {
- LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
- Path validLengthFilePath = new Path(partPath.getParent(), validLengthPrefix + partPath.getName()).suffix(validLengthSuffix);
- FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
- lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
- lengthFileOut.close();
- }
-
- // invalidate in the state object
- bucketState.currentFile = null;
- bucketState.currentFileValidLength = -1;
- } catch (IOException e) {
- LOG.error("Error while restoring RollingSink state.", e);
- throw new RuntimeException("Error while restoring RollingSink state.", e);
- } catch (InvocationTargetException | IllegalAccessException e) {
- LOG.error("Cound not invoke truncate.", e);
- throw new RuntimeException("Could not invoke truncate.", e);
- }
- }
-
- LOG.debug("Clearing pending/in-progress files.");
-
- // Move files that are confirmed by a checkpoint but did not get moved to final location
- // because the checkpoint notification did not happen before a failure
-
- Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
- LOG.debug("Moving pending files to final location on restore.");
- for (Long pastCheckpointId : pastCheckpointIds) {
- // All the pending files are buckets that have been completed but are waiting to be renamed
- // to their final name
- for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
- Path finalPath = new Path(filename);
- Path pendingPath = new Path(finalPath.getParent(), pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
-
- try {
- if (fs.exists(pendingPath)) {
- LOG.debug(
- "Moving pending file {} to final location after complete checkpoint {}.",
- pendingPath,
- pastCheckpointId);
- fs.rename(pendingPath, finalPath);
- }
- } catch (IOException e) {
- LOG.error("Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
- throw new RuntimeException("Error while renaming pending file " + pendingPath+ " to final path " + finalPath, e);
- }
- }
- }
- bucketState.pendingFiles.clear();
- synchronized (bucketState.pendingFilesPerCheckpoint) {
- bucketState.pendingFilesPerCheckpoint.clear();
- }
-
- // we need to get this here since open() has not yet been called
- int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
- // delete pending files
- try {
-
- RemoteIterator<LocatedFileStatus> bucketFiles = fs.listFiles(new Path(basePath), true);
-
- while (bucketFiles.hasNext()) {
- LocatedFileStatus file = bucketFiles.next();
- if (file.getPath().toString().endsWith(pendingSuffix)) {
- // only delete files that contain our subtask index
- if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
- fs.delete(file.getPath(), true);
- }
- }
- if (file.getPath().toString().endsWith(inProgressSuffix)) {
- // only delete files that contain our subtask index
- if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
- LOG.debug("Deleting in-progress file {}", file.getPath().toString());
- fs.delete(file.getPath(), true);
- }
- }
- }
- } catch (IOException e) {
- LOG.error("Error while deleting old pending files: {}", e);
- throw new RuntimeException("Error while deleting old pending files.", e);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Setters for User configuration values
- // --------------------------------------------------------------------------------------------
-
- /**
- * Sets the maximum bucket size in bytes.
- *
- * <p>
- * When a bucket part file becomes larger than this size a new bucket part file is started and
- * the old one is closed. The name of the bucket files depends on the {@link Bucketer}.
- *
- * @param batchSize The bucket part file size in bytes.
- */
- public RollingSink<T> setBatchSize(long batchSize) {
- this.batchSize = batchSize;
- return this;
- }
-
- /**
- * Sets the {@link Bucketer} to use for determining the bucket files to write to.
- *
- * @param bucketer The bucketer to use.
- */
- public RollingSink<T> setBucketer(Bucketer bucketer) {
- this.bucketer = bucketer;
- return this;
- }
-
- /**
- * Sets the {@link Writer} to be used for writing the incoming elements to bucket files.
- *
- * @param writer The {@code Writer} to use.
- */
- public RollingSink<T> setWriter(Writer<T> writer) {
- this.writerTemplate = writer;
- return this;
- }
-
- /**
- * Sets the suffix of in-progress part files. The default is {@code "in-progress"}.
- */
- public RollingSink<T> setInProgressSuffix(String inProgressSuffix) {
- this.inProgressSuffix = inProgressSuffix;
- return this;
- }
-
- /**
- * Sets the prefix of in-progress part files. The default is {@code "_"}.
- */
- public RollingSink<T> setInProgressPrefix(String inProgressPrefix) {
- this.inProgressPrefix = inProgressPrefix;
- return this;
- }
-
- /**
- * Sets the suffix of pending part files. The default is {@code ".pending"}.
- */
- public RollingSink<T> setPendingSuffix(String pendingSuffix) {
- this.pendingSuffix = pendingSuffix;
- return this;
- }
-
- /**
- * Sets the prefix of pending part files. The default is {@code "_"}.
- */
- public RollingSink<T> setPendingPrefix(String pendingPrefix) {
- this.pendingPrefix = pendingPrefix;
- return this;
- }
-
- /**
- * Sets the suffix of valid-length files. The default is {@code ".valid-length"}.
- */
- public RollingSink<T> setValidLengthSuffix(String validLengthSuffix) {
- this.validLengthSuffix = validLengthSuffix;
- return this;
- }
-
- /**
- * Sets the prefix of valid-length files. The default is {@code "_"}.
- */
- public RollingSink<T> setValidLengthPrefix(String validLengthPrefix) {
- this.validLengthPrefix = validLengthPrefix;
- return this;
- }
-
- /**
- * Sets the prefix of part files. The default is {@code "part"}.
- */
- public RollingSink<T> setPartPrefix(String partPrefix) {
- this.partPrefix = partPrefix;
- return this;
- }
-
- /**
- * Disable cleanup of leftover in-progress/pending files when the sink is opened.
- *
- * <p>
- * This should only be disabled if using the sink without checkpoints, to not remove
- * the files already in the directory.
- */
- public RollingSink<T> disableCleanupOnOpen() {
- this.cleanupOnOpen = false;
- return this;
- }
-
- // --------------------------------------------------------------------------------------------
- // Internal Classes
- // --------------------------------------------------------------------------------------------
-
- /**
- * This is used for keeping track of the current in-progress files and files that we mark
- * for moving from pending to final location after we get a checkpoint-complete notification.
- */
- static final class BucketState implements Serializable {
- private static final long serialVersionUID = 1L;
-
- /**
- * The file that was in-progress when the last checkpoint occured.
- */
- String currentFile = null;
-
- /**
- * The valid length of the in-progress file at the time of the last checkpoint.
- */
- long currentFileValidLength = -1;
-
- /**
- * Pending files that accumulated since the last checkpoint.
- */
- List<String> pendingFiles = Lists.newArrayList();
-
- /**
- * When doing a checkpoint we move the pending files since the last checkpoint to this map
- * with the id of the checkpoint. When we get the checkpoint-complete notification we move
- * pending files of completed checkpoints to their final location.
- */
- final Map<Long, List<String>> pendingFilesPerCheckpoint = Maps.newHashMap();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
deleted file mode 100644
index 928d96e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-
-import java.io.IOException;
-
-/**
- * A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile SequenceFiles}.
- * The input to the {@link RollingSink} must
- * be a {@link org.apache.flink.api.java.tuple.Tuple2} of two Hadopo
- * {@link org.apache.hadoop.io.Writable Writables}.
- *
- * @param <K> The type of the first tuple field.
- * @param <V> The type of the second tuple field.
- */
-public class SequenceFileWriter<K extends Writable, V extends Writable> implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
- private static final long serialVersionUID = 1L;
-
- private final String compressionCodecName;
-
- private SequenceFile.CompressionType compressionType;
-
- private transient FSDataOutputStream outputStream;
-
- private transient SequenceFile.Writer writer;
-
- private Class<K> keyClass;
-
- private Class<V> valueClass;
-
- /**
- * Creates a new {@code SequenceFileWriter} that writes sequence files without compression.
- */
- public SequenceFileWriter() {
- this("None", SequenceFile.CompressionType.NONE);
- }
-
- /**
- * Creates a new {@code SequenceFileWriter} that writes sequence with the given
- * compression codec and compression type.
- *
- * @param compressionCodecName Name of a Hadoop Compression Codec.
- * @param compressionType The compression type to use.
- */
- public SequenceFileWriter(String compressionCodecName,
- SequenceFile.CompressionType compressionType) {
- this.compressionCodecName = compressionCodecName;
- this.compressionType = compressionType;
- }
-
- @Override
- public void open(FSDataOutputStream outStream) throws IOException {
- if (outputStream != null) {
- throw new IllegalStateException("SequenceFileWriter has already been opened.");
- }
- if (keyClass == null) {
- throw new IllegalStateException("Key Class has not been initialized.");
- }
- if (valueClass == null) {
- throw new IllegalStateException("Value Class has not been initialized.");
- }
-
- this.outputStream = outStream;
-
- CompressionCodec codec = null;
-
- if (!compressionCodecName.equals("None")) {
- CompressionCodecFactory codecFactory = new CompressionCodecFactory(new Configuration());
- codec = codecFactory.getCodecByName(compressionCodecName);
- if (codec == null) {
- throw new RuntimeException("Codec " + compressionCodecName + " not found.");
- }
- }
-
- // the non-deprecated constructor syntax is only available in recent hadoop versions...
- writer = SequenceFile.createWriter(new Configuration(),
- outStream,
- keyClass,
- valueClass,
- compressionType,
- codec);
- }
-
- @Override
- public void flush() throws IOException {
- }
-
- @Override
- public void close() throws IOException {
- if (writer != null) {
- writer.close();
- }
- writer = null;
- outputStream = null;
- }
-
- @Override
- public void write(Tuple2<K, V> element) throws IOException {
- if (outputStream == null) {
- throw new IllegalStateException("SequenceFileWriter has not been opened.");
- }
- writer.append(element.f0, element.f1);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
- if (!type.isTupleType()) {
- throw new IllegalArgumentException("Input TypeInformation is not a tuple type.");
- }
-
- TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type;
-
- if (tupleType.getArity() != 2) {
- throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type.");
- }
-
- TypeInformation<K> keyType = tupleType.getTypeAt(0);
- TypeInformation<V> valueType = tupleType.getTypeAt(1);
-
- this.keyClass = keyType.getTypeClass();
- this.valueClass = valueType.getTypeClass();
- }
-
- @Override
- public Writer<Tuple2<K, V>> duplicate() {
- SequenceFileWriter<K, V> result = new SequenceFileWriter<>(compressionCodecName, compressionType);
- result.keyClass = keyClass;
- result.valueClass = valueClass;
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
deleted file mode 100644
index ad0ab46..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
-
-/**
- * A {@link Writer} that uses {@code toString()} on the input elements and writes them to
- * the output bucket file separated by newline.
- *
- * @param <T> The type of the elements that are being written by the sink.
- */
-public class StringWriter<T> implements Writer<T> {
- private static final long serialVersionUID = 1L;
-
- private transient FSDataOutputStream outputStream;
-
- private String charsetName;
-
- private transient Charset charset;
-
- /**
- * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
- * strings to bytes.
- */
- public StringWriter() {
- this("UTF-8");
- }
-
- /**
- * Creates a new {@code StringWriter} that uses the given charset to convert
- * strings to bytes.
- *
- * @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}
- */
- public StringWriter(String charsetName) {
- this.charsetName = charsetName;
- }
-
- @Override
- public void open(FSDataOutputStream outStream) throws IOException {
- if (outputStream != null) {
- throw new IllegalStateException("StringWriter has already been opened.");
- }
- this.outputStream = outStream;
-
- try {
- this.charset = Charset.forName(charsetName);
- }
- catch (IllegalCharsetNameException e) {
- throw new IOException("The charset " + charsetName + " is not valid.", e);
- }
- catch (UnsupportedCharsetException e) {
- throw new IOException("The charset " + charsetName + " is not supported.", e);
- }
- }
-
- @Override
- public void flush() throws IOException {
-
- }
-
- @Override
- public void close() throws IOException {
- outputStream = null;
- }
-
- @Override
- public void write(T element) throws IOException {
- if (outputStream == null) {
- throw new IllegalStateException("StringWriter has not been opened.");
- }
- outputStream.write(element.toString().getBytes(charset));
- outputStream.write('\n');
-
- }
-
- @Override
- public Writer<T> duplicate() {
- return new StringWriter<>();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
deleted file mode 100644
index 2bab8cf..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-/**
- * A {@link Clock} that uses {@code System.currentTimeMillis()} to determine the system time.
- */
-public class SystemClock implements Clock {
- @Override
- public long currentTimeMillis() {
- return System.currentTimeMillis();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
deleted file mode 100644
index 98cad32..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * An implementation of {@code Writer} is used in conjunction with a
- * {@link RollingSink} to perform the actual
- * writing to the bucket files.
- *
- * @param <T> The type of the elements that are being written by the sink.
- */
-public interface Writer<T> extends Serializable {
-
- /**
- * Initializes the {@code Writer} for a newly opened bucket file.
- * Any internal per-bucket initialization should be performed here.
- *
- * @param outStream The {@link org.apache.hadoop.fs.FSDataOutputStream} for the newly opened file.
- */
- void open(FSDataOutputStream outStream) throws IOException;
-
- /**
- * Flushes out any internally held data.
- */
- void flush()throws IOException ;
-
- /**
- * Closes the {@code Writer}. This must not close the {@code FSDataOutputStream} that
- * was handed in in the {@link #open} method. Only internally held state should be
- * closed.
- */
- void close() throws IOException ;
-
- /**
- * Writes one element to the bucket file.
- */
- void write(T element)throws IOException;
-
- /**
- * Duplicates the {@code Writer}. This is used to get one {@code Writer} for each
- * parallel instance of the sink.
- */
- Writer<T> duplicate();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
deleted file mode 100644
index fe60d94..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger