You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:07 UTC
[17/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml b/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml
deleted file mode 100644
index 51f55b3..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml
+++ /dev/null
@@ -1,83 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
- <name>flink-connector-elasticsearch2</name>
-
- <packaging>jar</packaging>
-
- <!-- Allow users to pass custom connector versions -->
- <properties>
- <elasticsearch.version>2.3.5</elasticsearch.version>
- </properties>
-
- <dependencies>
-
- <!-- core dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
-
- <!-- core dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
deleted file mode 100644
index 650931f..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch2;
-
-import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.bulk.BulkProcessor;
-
-public class BulkProcessorIndexer implements RequestIndexer {
- private final BulkProcessor bulkProcessor;
-
- public BulkProcessorIndexer(BulkProcessor bulkProcessor) {
- this.bulkProcessor = bulkProcessor;
- }
-
- @Override
- public void add(ActionRequest... actionRequests) {
- for (ActionRequest actionRequest : actionRequests) {
- this.bulkProcessor.add(actionRequest);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
deleted file mode 100644
index e839589..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch2;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.util.Preconditions;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Sink that emits its input elements in bulk to an Elasticsearch cluster.
- *
- * <p>
- * When using the second constructor
- * {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a {@link TransportClient} will
- * be used.
- *
- * <p>
- * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
- * can be connected to.
- *
- * <p>
- * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
- * {@link TransportClient}. The config keys can be found in the Elasticsearch
- * documentation. An important setting is {@code cluster.name}, this should be set to the name
- * of the cluster that the sink should emit to.
- *
- * <p>
- * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
- * This will buffer elements before sending a request to the cluster. The behaviour of the
- * {@code BulkProcessor} can be configured using these config keys:
- * <ul>
- * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
- * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
- * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
- * settings in milliseconds
- * </ul>
- *
- * <p>
- * You also have to provide an {@link RequestIndexer}. This is used to create an
- * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
- * {@link RequestIndexer} for an example.
- *
- * @param <T> Type of the elements emitted by this sink
- */
-public class ElasticsearchSink<T> extends RichSinkFunction<T> {
-
- public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
- public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
- public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
-
- /**
- * The user specified config map that we forward to Elasticsearch when we create the Client.
- */
- private final Map<String, String> userConfig;
-
- /**
- * The list of nodes that the TransportClient should connect to. This is null if we are using
- * an embedded Node to get a Client.
- */
- private final List<InetSocketAddress> transportAddresses;
-
- /**
- * The builder that is used to construct an {@link IndexRequest} from the incoming element.
- */
- private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
-
- /**
- * The Client that was either retrieved from a Node or is a TransportClient.
- */
- private transient Client client;
-
- /**
- * Bulk processor that was created using the client
- */
- private transient BulkProcessor bulkProcessor;
-
- /**
- * Bulk {@link org.elasticsearch.action.ActionRequest} indexer
- */
- private transient RequestIndexer requestIndexer;
-
- /**
- * This is set from inside the BulkProcessor listener if there where failures in processing.
- */
- private final AtomicBoolean hasFailure = new AtomicBoolean(false);
-
- /**
- * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
- */
- private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
-
- /**
- * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient.
- *
- * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor
- * @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient}
- * @param elasticsearchSinkFunction This is used to generate the ActionRequest from the incoming element
- *
- */
- public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- this.userConfig = userConfig;
- this.elasticsearchSinkFunction = elasticsearchSinkFunction;
- Preconditions.checkArgument(transportAddresses != null && transportAddresses.size() > 0);
- this.transportAddresses = transportAddresses;
- }
-
- /**
- * Initializes the connection to Elasticsearch by creating a
- * {@link org.elasticsearch.client.transport.TransportClient}.
- */
- @Override
- public void open(Configuration configuration) {
- List<TransportAddress> transportNodes;
- transportNodes = new ArrayList<>(transportAddresses.size());
- for (InetSocketAddress address : transportAddresses) {
- transportNodes.add(new InetSocketTransportAddress(address));
- }
-
- Settings settings = Settings.settingsBuilder().put(userConfig).build();
-
- TransportClient transportClient = TransportClient.builder().settings(settings).build();
- for (TransportAddress transport: transportNodes) {
- transportClient.addTransportAddress(transport);
- }
-
- // verify that we actually are connected to a cluster
- ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(transportClient.connectedNodes());
- if (nodes.isEmpty()) {
- throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
- }
-
- client = transportClient;
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Created Elasticsearch TransportClient {}", client);
- }
-
- BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, new BulkProcessor.Listener() {
- @Override
- public void beforeBulk(long executionId, BulkRequest request) {
-
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
- if (response.hasFailures()) {
- for (BulkItemResponse itemResp : response.getItems()) {
- if (itemResp.isFailed()) {
- LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
- failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
- }
- }
- hasFailure.set(true);
- }
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- LOG.error(failure.getMessage());
- failureThrowable.compareAndSet(null, failure);
- hasFailure.set(true);
- }
- });
-
- // This makes flush() blocking
- bulkProcessorBuilder.setConcurrentRequests(0);
-
- ParameterTool params = ParameterTool.fromMap(userConfig);
-
- if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
- bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
- }
-
- if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
- bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
- CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
- }
-
- if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
- bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
- }
-
- bulkProcessor = bulkProcessorBuilder.build();
- requestIndexer = new BulkProcessorIndexer(bulkProcessor);
- }
-
- @Override
- public void invoke(T element) {
- elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer);
- }
-
- @Override
- public void close() {
- if (bulkProcessor != null) {
- bulkProcessor.close();
- bulkProcessor = null;
- }
-
- if (client != null) {
- client.close();
- }
-
- if (hasFailure.get()) {
- Throwable cause = failureThrowable.get();
- if (cause != null) {
- throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
- } else {
- throw new RuntimeException("An error occured in ElasticsearchSink.");
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
deleted file mode 100644
index 55ba720..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch2;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-import java.io.Serializable;
-
-/**
- * Method that creates an {@link org.elasticsearch.action.ActionRequest} from an element in a Stream.
- *
- * <p>
- * This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch.
- *
- * <p>
- * Example:
- *
- * <pre>{@code
- * private static class TestElasticSearchSinkFunction implements
- * ElasticsearchSinkFunction<Tuple2<Integer, String>> {
- *
- * public IndexRequest createIndexRequest(Tuple2<Integer, String> element) {
- * Map<String, Object> json = new HashMap<>();
- * json.put("data", element.f1);
- *
- * return Requests.indexRequest()
- * .index("my-index")
- * .type("my-type")
- * .id(element.f0.toString())
- * .source(json);
- * }
- *
- * public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) {
- * indexer.add(createIndexRequest(element));
- * }
- * }
- *
- * }</pre>
- *
- * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction}
- */
-public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
- void process(T element, RuntimeContext ctx, RequestIndexer indexer);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
deleted file mode 100644
index 144a87b..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch2;
-
-import org.elasticsearch.action.ActionRequest;
-
-import java.io.Serializable;
-
-public interface RequestIndexer extends Serializable {
- void add(ActionRequest... actionRequests);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
deleted file mode 100644
index bc9bedc..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch2;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase {
-
- private static final int NUM_ELEMENTS = 20;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Test
- public void testTransportClient() throws Exception {
-
- File dataDir = tempFolder.newFolder();
-
- Node node = NodeBuilder.nodeBuilder()
- .settings(Settings.settingsBuilder()
- .put("path.home", dataDir.getParent())
- .put("http.enabled", false)
- .put("path.data", dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that user config works correctly
- .clusterName("my-transport-client-cluster")
- .node();
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-transport-client-cluster");
-
- // Can't use {@link TransportAddress} as its not Serializable in Elasticsearch 2.x
- List<InetSocketAddress> transports = new ArrayList<>();
- transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
- source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction()));
-
- env.execute("Elasticsearch TransportClient Test");
-
- // verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new GetRequest("my-index",
- "my-type", Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i, response.getSource().get("data"));
- }
-
- node.close();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testNullTransportClient() throws Exception {
-
- File dataDir = tempFolder.newFolder();
-
- Node node = NodeBuilder.nodeBuilder()
- .settings(Settings.settingsBuilder()
- .put("path.home", dataDir.getParent())
- .put("http.enabled", false)
- .put("path.data", dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that user config works correctly
- .clusterName("my-transport-client-cluster")
- .node();
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-transport-client-cluster");
-
- source.addSink(new ElasticsearchSink<>(config, null, new TestElasticsearchSinkFunction()));
-
- env.execute("Elasticsearch TransportClient Test");
-
- // verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new GetRequest("my-index",
- "my-type", Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i, response.getSource().get("data"));
- }
-
- node.close();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testEmptyTransportClient() throws Exception {
-
- File dataDir = tempFolder.newFolder();
-
- Node node = NodeBuilder.nodeBuilder()
- .settings(Settings.settingsBuilder()
- .put("path.home", dataDir.getParent())
- .put("http.enabled", false)
- .put("path.data", dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that user config works correctly
- .clusterName("my-transport-client-cluster")
- .node();
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-transport-client-cluster");
-
- source.addSink(new ElasticsearchSink<>(config, new ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction()));
-
- env.execute("Elasticsearch TransportClient Test");
-
- // verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new GetRequest("my-index",
- "my-type", Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i, response.getSource().get("data"));
- }
-
- node.close();
- }
-
- @Test(expected = JobExecutionException.class)
- public void testTransportClientFails() throws Exception{
- // this checks whether the TransportClient fails early when there is no cluster to
- // connect to. There isn't a similar test for the Node Client version since that
- // one will block and wait for a cluster to come online
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-node-client-cluster");
-
- List<InetSocketAddress> transports = new ArrayList<>();
- transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
- source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction()));
-
- env.execute("Elasticsearch Node Client Test");
- }
-
- private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
- for (int i = 0; i < NUM_ELEMENTS && running; i++) {
- ctx.collect(Tuple2.of(i, "message #" + i));
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
-
- private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<Tuple2<Integer, String>> {
- private static final long serialVersionUID = 1L;
-
- public IndexRequest createIndexRequest(Tuple2<Integer, String> element) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element.f1);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .id(element.f0.toString())
- .source(json);
- }
-
- @Override
- public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) {
- indexer.add(createIndexRequest(element));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
deleted file mode 100644
index 05760e8..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch2.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch2.RequestIndexer;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the name of cluster in the config map.
- */
-public class ElasticsearchExample {
-
- public static void main(String[] args) throws Exception {
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- SingleOutputStreamOperator<String> source =
- env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
- /**
- * The mapping method. Takes an element from the input data set and transforms
- * it into exactly one element.
- *
- * @param value The input value.
- * @return The transformed value
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
- @Override
- public String map(Long value) throws Exception {
- return "message #" + value;
- }
- });
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
- List<InetSocketAddress> transports = new ArrayList<>();
- transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
- source.addSink(new ElasticsearchSink<>(config, transports, new ElasticsearchSinkFunction<String>(){
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
- indexer.add(createIndexRequest(element));
- }
- }));
-
- env.execute("Elasticsearch Example");
- }
-
- private static IndexRequest createIndexRequest(String element) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .id(element)
- .source(json);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
deleted file mode 100644
index dc20726..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml
deleted file mode 100644
index 7a077c2..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
- <logger name="org.apache.flink.streaming.connectors.elasticsearch2" level="WARN"/>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/pom.xml b/flink-streaming-connectors/flink-connector-filesystem/pom.xml
deleted file mode 100644
index 20c48c6..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/pom.xml
+++ /dev/null
@@ -1,163 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-filesystem_2.10</artifactId>
- <name>flink-connector-filesystem</name>
-
- <packaging>jar</packaging>
-
- <!--
- This is a Hadoop2 only flink module.
- -->
-
- <dependencies>
-
- <!-- core dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-shaded-hadoop2</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <!-- test dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils-junit</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-hadoop-compatibility_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minikdc</artifactId>
- <version>${minikdc.version}</version>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
-
- <!--
- https://issues.apache.org/jira/browse/DIRSHARED-134
- Required to pull the Mini-KDC transitive dependency
- -->
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <version>3.0.1</version>
- <inherited>true</inherited>
- <extensions>true</extensions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <!--
- Enforce single threaded execution to avoid port conflicts when running
- secure mini DFS cluster
- -->
- <forkCount>1</forkCount>
- <reuseForks>false</reuseForks>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
deleted file mode 100644
index 3e3c86b..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
+++ /dev/null
@@ -1,309 +0,0 @@
-package org.apache.flink.streaming.connectors.fs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-/**
-* Implementation of AvroKeyValue writer that can be used in Sink.
-* Each entry would be wrapped in GenericRecord with key/value fields(same as in m/r lib)
-<pre>
-Usage:
-{@code
- BucketingSink<Tuple2<Long, Long>> sink = new BucketingSink<Tuple2<Long, Long>>("/tmp/path");
- sink.setBucketer(new DateTimeBucketer<Tuple2<Long, Long>>("yyyy-MM-dd/HH/mm/"));
- sink.setPendingSuffix(".avro");
- Map<String,String> properties = new HashMap<>();
- Schema longSchema = Schema.create(Type.LONG);
- String keySchema = longSchema.toString();
- String valueSchema = longSchema.toString();
- properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema);
- properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema);
- properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, Boolean.toString(true));
- properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
-
- sink.setWriter(new AvroSinkWriter<Long, Long>(properties));
- sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB,
-}
-</pre>
-*/
-public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
- private static final long serialVersionUID = 1L;
- public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key";
- public static final String CONF_OUTPUT_VALUE_SCHEMA = "avro.schema.output.value";
- public static final String CONF_COMPRESS = FileOutputFormat.COMPRESS;
- public static final String CONF_COMPRESS_CODEC = FileOutputFormat.COMPRESS_CODEC;
- public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level";
- public static final String CONF_XZ_LEVEL = "avro.xz.level";
-
- private transient AvroKeyValueWriter<K, V> keyValueWriter;
-
- private final Map<String, String> properties;
-
- /**
- * C'tor for the writer
- * <p>
- * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
- * @param properties
- */
- @SuppressWarnings("deprecation")
- public AvroKeyValueSinkWriter(Map<String, String> properties) {
- this.properties = properties;
-
- String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA);
- if (keySchemaString == null) {
- throw new IllegalStateException("No key schema provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property");
- }
- Schema.parse(keySchemaString);//verifying that schema valid
-
- String valueSchemaString = properties.get(CONF_OUTPUT_VALUE_SCHEMA);
- if (valueSchemaString == null) {
- throw new IllegalStateException("No value schema provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property");
- }
- Schema.parse(valueSchemaString);//verifying that schema valid
- }
-
- private boolean getBoolean(Map<String,String> conf, String key, boolean def) {
- String value = conf.get(key);
- if (value == null) {
- return def;
- }
- return Boolean.parseBoolean(value);
- }
-
- private int getInt(Map<String,String> conf, String key, int def) {
- String value = conf.get(key);
- if (value == null) {
- return def;
- }
- return Integer.parseInt(value);
- }
-
- //this derived from AvroOutputFormatBase.getCompressionCodec(..)
- private CodecFactory getCompressionCodec(Map<String,String> conf) {
- if (getBoolean(conf, CONF_COMPRESS, false)) {
- int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, CodecFactory.DEFAULT_DEFLATE_LEVEL);
- int xzLevel = getInt(conf, CONF_XZ_LEVEL, CodecFactory.DEFAULT_XZ_LEVEL);
-
- String outputCodec = conf.get(CONF_COMPRESS_CODEC);
-
- if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
- return CodecFactory.deflateCodec(deflateLevel);
- } else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) {
- return CodecFactory.xzCodec(xzLevel);
- } else {
- return CodecFactory.fromString(outputCodec);
- }
- }
- return CodecFactory.nullCodec();
- }
-
- @Override
- @SuppressWarnings("deprecation")
- public void open(FileSystem fs, Path path) throws IOException {
- super.open(fs, path);
-
- CodecFactory compressionCodec = getCompressionCodec(properties);
- Schema keySchema = Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA));
- Schema valueSchema = Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA));
- keyValueWriter = new AvroKeyValueWriter<K, V>(keySchema, valueSchema, compressionCodec, getStream());
- }
-
- @Override
- public void close() throws IOException {
- super.close();//the order is important since super.close flushes inside
- if (keyValueWriter != null) {
- keyValueWriter.close();
- }
- }
-
- @Override
- public long flush() throws IOException {
- if (keyValueWriter != null) {
- keyValueWriter.sync();
- }
- return super.flush();
- }
-
- @Override
- public void write(Tuple2<K, V> element) throws IOException {
- getStream(); // Throws if the stream is not open
- keyValueWriter.write(element.f0, element.f1);
- }
-
- @Override
- public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
- if (!type.isTupleType()) {
- throw new IllegalArgumentException("Input TypeInformation is not a tuple type.");
- }
-
- TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type;
-
- if (tupleType.getArity() != 2) {
- throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type.");
- }
- }
-
- @Override
- public Writer<Tuple2<K, V>> duplicate() {
- return new AvroKeyValueSinkWriter<K, V>(properties);
- }
-
- // taken from m/r avro lib to remove dependency on it
- private static final class AvroKeyValueWriter<K, V> {
- /** A writer for the Avro container file. */
- private final DataFileWriter<GenericRecord> mAvroFileWriter;
-
- /**
- * The writer schema for the generic record entries of the Avro
- * container file.
- */
- private final Schema mKeyValuePairSchema;
-
- /**
- * A reusable Avro generic record for writing key/value pairs to the
- * file.
- */
- private final AvroKeyValue<Object, Object> mOutputRecord;
-
- AvroKeyValueWriter(Schema keySchema, Schema valueSchema,
- CodecFactory compressionCodec, OutputStream outputStream,
- int syncInterval) throws IOException {
- // Create the generic record schema for the key/value pair.
- mKeyValuePairSchema = AvroKeyValue
- .getSchema(keySchema, valueSchema);
-
- // Create an Avro container file and a writer to it.
- DatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(
- mKeyValuePairSchema);
- mAvroFileWriter = new DataFileWriter<GenericRecord>(
- genericDatumWriter);
- mAvroFileWriter.setCodec(compressionCodec);
- mAvroFileWriter.setSyncInterval(syncInterval);
- mAvroFileWriter.create(mKeyValuePairSchema, outputStream);
-
- // Create a reusable output record.
- mOutputRecord = new AvroKeyValue<Object, Object>(
- new GenericData.Record(mKeyValuePairSchema));
- }
-
- AvroKeyValueWriter(Schema keySchema, Schema valueSchema,
- CodecFactory compressionCodec, OutputStream outputStream)
- throws IOException {
- this(keySchema, valueSchema, compressionCodec, outputStream,
- DataFileConstants.DEFAULT_SYNC_INTERVAL);
- }
-
- void write(K key, V value) throws IOException {
- mOutputRecord.setKey(key);
- mOutputRecord.setValue(value);
- mAvroFileWriter.append(mOutputRecord.get());
- }
-
- void close() throws IOException {
- mAvroFileWriter.close();
- }
-
- long sync() throws IOException {
- return mAvroFileWriter.sync();
- }
- }
-
- // taken from AvroKeyValue avro-mapr lib
- public static class AvroKeyValue<K, V> {
- /** The name of the key value pair generic record. */
- public static final String KEY_VALUE_PAIR_RECORD_NAME = "KeyValuePair";
-
- /** The namespace of the key value pair generic record. */
- public static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = "org.apache.avro.mapreduce";
-
- /** The name of the generic record field containing the key. */
- public static final String KEY_FIELD = "key";
-
- /** The name of the generic record field containing the value. */
- public static final String VALUE_FIELD = "value";
-
- /** The key/value generic record wrapped by this class. */
- public final GenericRecord mKeyValueRecord;
-
- /**
- * Wraps a GenericRecord that is a key value pair.
- */
- public AvroKeyValue(GenericRecord keyValueRecord) {
- mKeyValueRecord = keyValueRecord;
- }
-
- public GenericRecord get() {
- return mKeyValueRecord;
- }
-
- public void setKey(K key) {
- mKeyValueRecord.put(KEY_FIELD, key);
- }
-
- public void setValue(V value) {
- mKeyValueRecord.put(VALUE_FIELD, value);
- }
-
- @SuppressWarnings("unchecked")
- public K getKey() {
- return (K) mKeyValueRecord.get(KEY_FIELD);
- }
-
- @SuppressWarnings("unchecked")
- public V getValue() {
- return (V) mKeyValueRecord.get(VALUE_FIELD);
- }
-
- /**
- * Creates a KeyValuePair generic record schema.
- *
- * @return A schema for a generic record with two fields: 'key' and
- * 'value'.
- */
- public static Schema getSchema(Schema keySchema, Schema valueSchema) {
- Schema schema = Schema.createRecord(KEY_VALUE_PAIR_RECORD_NAME,
- "A key/value pair", KEY_VALUE_PAIR_RECORD_NAMESPACE, false);
- schema.setFields(Arrays.asList(new Schema.Field(KEY_FIELD,
- keySchema, "The key", null), new Schema.Field(VALUE_FIELD,
- valueSchema, "The value", null)));
- return schema;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
deleted file mode 100644
index 24ad6ab..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.hadoop.fs.Path;
-
-import java.io.Serializable;
-
-/**
- * A bucketer is used with a {@link RollingSink}
- * to put emitted elements into rolling files.
- *
- * <p>
- * The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever
- * a new element arrives it will ask the {@code Bucketer} if a new bucket should be started and
- * the old one closed. The {@code Bucketer} can, for example, decide to start new buckets
- * based on system time.
- *
- * @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.Bucketer} instead.
- */
-@Deprecated
-public interface Bucketer extends Serializable {
-
- /**
- * Returns {@code true} when a new bucket should be started.
- *
- * @param currentBucketPath The bucket {@code Path} that is currently being used.
- */
- boolean shouldStartNewBucket(Path basePath, Path currentBucketPath);
-
- /**
- * Returns the {@link Path} of a new bucket file.
- *
- * @param basePath The base path containing all the buckets.
- *
- * @return The complete new {@code Path} of the new bucket. This should include the {@code basePath}
- * and also the {@code subtaskIndex} tp avoid clashes with parallel sinks.
- */
- Path getNextBucketPath(Path basePath);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
deleted file mode 100644
index 174707c..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-
-/**
- * A clock that can provide the current time.
- *
- * <p>
- * Normally this would be system time, but for testing a custom {@code Clock} can be provided.
- */
-public interface Clock {
-
- /**
- * Return the current system time in milliseconds.
- */
- public long currentTimeMillis();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
deleted file mode 100644
index 0df8998..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-/**
- * A {@link Bucketer} that assigns to buckets based on current system time.
- *
- * <p>
- * The {@code DateTimeBucketer} will create directories of the following form:
- * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
- * that was specified as a base path when creating the
- * {@link RollingSink}. The {@code dateTimePath}
- * is determined based on the current system time and the user provided format string.
- *
- * <p>
- * {@link SimpleDateFormat} is used to derive a date string from the current system time and
- * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
- * files will have a granularity of hours.
- *
- *
- * <p>
- * Example:
- *
- * <pre>{@code
- * Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
- * }</pre>
- *
- * This will create for example the following bucket path:
- * {@code /base/1976-12-31-14/}
- *
- * @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer} instead.
- */
-@Deprecated
-public class DateTimeBucketer implements Bucketer {
-
- private static Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class);
-
- private static final long serialVersionUID = 1L;
-
- private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
-
- // We have this so that we can manually set it for tests.
- private static Clock clock = new SystemClock();
-
- private final String formatString;
-
- private transient SimpleDateFormat dateFormatter;
-
- /**
- * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}.
- */
- public DateTimeBucketer() {
- this(DEFAULT_FORMAT_STRING);
- }
-
- /**
- * Creates a new {@code DateTimeBucketer} with the given date/time format string.
- *
- * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
- * the bucket path.
- */
- public DateTimeBucketer(String formatString) {
- this.formatString = formatString;
-
- this.dateFormatter = new SimpleDateFormat(formatString);
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
-
- this.dateFormatter = new SimpleDateFormat(formatString);
- }
-
-
- @Override
- public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
- String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
- return !(new Path(basePath, newDateTimeString).equals(currentBucketPath));
- }
-
- @Override
- public Path getNextBucketPath(Path basePath) {
- String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
- return new Path(basePath + "/" + newDateTimeString);
- }
-
- @Override
- public String toString() {
- return "DateTimeBucketer{" +
- "formatString='" + formatString + '\'' +
- '}';
- }
-
- /**
- * This sets the internal {@link Clock} implementation. This method should only be used for testing
- *
- * @param newClock The new clock to set.
- */
- public static void setClock(Clock newClock) {
- clock = newClock;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
deleted file mode 100644
index 6854596..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;
-import org.apache.hadoop.fs.Path;
-
-/**
- * A {@link Bucketer} that does not perform any
- * rolling of files. All files are written to the base path.
- *
- * @deprecated use {@link BasePathBucketer} instead.
- */
-@Deprecated
-public class NonRollingBucketer implements Bucketer {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
- return false;
- }
-
- @Override
- public Path getNextBucketPath(Path basePath) {
- return basePath;
- }
-
- @Override
- public String toString() {
- return "NonRollingBucketer";
- }
-}