You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:08 UTC
[18/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
deleted file mode 100644
index a3d002e..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
-
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a
- * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to cassandra
- * if a checkpoint is completed.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
- private static final long serialVersionUID = 1L;
-
- protected transient Cluster cluster;
- protected transient Session session;
-
- private final String insertQuery;
- private transient PreparedStatement preparedStatement;
-
- private ClusterBuilder builder;
-
- private transient Object[] fields;
-
- protected CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception {
- super(committer, serializer, UUID.randomUUID().toString().replace("-", "_"));
- this.insertQuery = insertQuery;
- this.builder = builder;
- ClosureCleaner.clean(builder, true);
- }
-
- public void open() throws Exception {
- super.open();
- if (!getRuntimeContext().isCheckpointingEnabled()) {
- throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled.");
- }
- cluster = builder.getCluster();
- session = cluster.connect();
- preparedStatement = session.prepare(insertQuery);
-
- fields = new Object[((TupleSerializer<IN>) serializer).getArity()];
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- try {
- if (session != null) {
- session.close();
- }
- } catch (Exception e) {
- LOG.error("Error while closing session.", e);
- }
- try {
- if (cluster != null) {
- cluster.close();
- }
- } catch (Exception e) {
- LOG.error("Error while closing cluster.", e);
- }
- }
-
- @Override
- protected boolean sendValues(Iterable<IN> values, long timestamp) throws Exception {
- final AtomicInteger updatesCount = new AtomicInteger(0);
- final AtomicInteger updatesConfirmed = new AtomicInteger(0);
-
- final AtomicReference<Throwable> exception = new AtomicReference<>();
-
- FutureCallback<ResultSet> callback = new FutureCallback<ResultSet>() {
- @Override
- public void onSuccess(ResultSet resultSet) {
- updatesConfirmed.incrementAndGet();
- if (updatesCount.get() > 0) { // only set if all updates have been sent
- if (updatesCount.get() == updatesConfirmed.get()) {
- synchronized (updatesConfirmed) {
- updatesConfirmed.notifyAll();
- }
- }
- }
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- if (exception.compareAndSet(null, throwable)) {
- LOG.error("Error while sending value.", throwable);
- synchronized (updatesConfirmed) {
- updatesConfirmed.notifyAll();
- }
- }
- }
- };
-
- //set values for prepared statement
- int updatesSent = 0;
- for (IN value : values) {
- for (int x = 0; x < value.getArity(); x++) {
- fields[x] = value.getField(x);
- }
- //insert values and send to cassandra
- BoundStatement s = preparedStatement.bind(fields);
- s.setDefaultTimestamp(timestamp);
- ResultSetFuture result = session.executeAsync(s);
- updatesSent++;
- if (result != null) {
- //add callback to detect errors
- Futures.addCallback(result, callback);
- }
- }
- updatesCount.set(updatesSent);
-
- synchronized (updatesConfirmed) {
- while (exception.get() == null && updatesSent != updatesConfirmed.get()) {
- updatesConfirmed.wait();
- }
- }
-
- if (exception.get() != null) {
- LOG.warn("Sending a value failed.", exception.get());
- return false;
- } else {
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
deleted file mode 100644
index 9fd3b4e..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-
-import java.io.Serializable;
-
-/**
- * This class is used to configure a {@link com.datastax.driver.core.Cluster} after deployment.
- * The cluster represents the connection that will be established to Cassandra.
- */
-public abstract class ClusterBuilder implements Serializable {
-
- public Cluster getCluster() {
- return buildCluster(Cluster.builder());
- }
-
- /**
- * Configures the connection to Cassandra.
- * The configuration is done by calling methods on the builder object
- * and finalizing the configuration with build().
- *
- * @param builder connection builder
- * @return configured connection
- */
- protected abstract Cluster buildCluster(Cluster.Builder builder);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
deleted file mode 100644
index e66b8b3..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.batch.connectors.cassandra.example;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
-import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import java.util.ArrayList;
-
-/**
- * This is an example showing the to use the Cassandra Input-/OutputFormats in the Batch API.
- *
- * The example assumes that a table exists in a local cassandra database, according to the following query:
- * CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));
- */
-public class BatchExample {
- private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);";
- private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;";
-
- /*
- * table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));"
- */
- public static void main(String[] args) throws Exception {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- ArrayList<Tuple2<Integer, String>> collection = new ArrayList<>(20);
- for (int i = 0; i < 20; i++) {
- collection.add(new Tuple2<>(i, "string " + i));
- }
-
- DataSet<Tuple2<Integer, String>> dataSet = env.fromCollection(collection);
-
- dataSet.output(new CassandraOutputFormat<Tuple2<Integer, String>>(INSERT_QUERY, new ClusterBuilder() {
- @Override
- protected Cluster buildCluster(Builder builder) {
- return builder.addContactPoints("127.0.0.1").build();
- }
- }));
-
- env.execute("Write");
-
- DataSet<Tuple2<Integer, String>> inputDS = env
- .createInput(new CassandraInputFormat<Tuple2<Integer, String>>(SELECT_QUERY, new ClusterBuilder() {
- @Override
- protected Cluster buildCluster(Builder builder) {
- return builder.addContactPoints("127.0.0.1").build();
- }
- }), TupleTypeInfo.of(new TypeHint<Tuple2<Integer, String>>() {
- }));
-
- inputDS.print();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
deleted file mode 100644
index 2bb6fd1..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.QueryOptions;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-
-import org.apache.cassandra.service.CassandraDaemon;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
-import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.util.TestEnvironment;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Scanner;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("serial")
-public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
- private static File tmpDir;
-
- private static final boolean EMBEDDED = true;
-
- private static EmbeddedCassandraService cassandra;
-
- private static ClusterBuilder builder = new ClusterBuilder() {
- @Override
- protected Cluster buildCluster(Cluster.Builder builder) {
- return builder
- .addContactPoint("127.0.0.1")
- .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
- .withoutJMXReporting()
- .withoutMetrics().build();
- }
- };
-
- private static Cluster cluster;
- private static Session session;
-
- private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
- private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;";
- private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);";
- private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;";
- private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)";
- private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;";
-
- private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20);
-
- static {
- for (int i = 0; i < 20; i++) {
- collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
- }
- }
-
- private static class EmbeddedCassandraService {
- CassandraDaemon cassandraDaemon;
-
- public void start() throws IOException {
- this.cassandraDaemon = new CassandraDaemon();
- this.cassandraDaemon.init(null);
- this.cassandraDaemon.start();
- }
-
- public void stop() {
- this.cassandraDaemon.stop();
- }
- }
-
- private static LocalFlinkMiniCluster flinkCluster;
-
- // ------------------------------------------------------------------------
- // Cluster Setup (Cassandra & Flink)
- // ------------------------------------------------------------------------
-
- @BeforeClass
- public static void startCassandra() throws IOException {
-
- // check if we should run this test, current Cassandra version requires Java >= 1.8
- org.apache.flink.core.testutils.CommonTestUtils.assumeJava8();
-
- // generate temporary files
- tmpDir = CommonTestUtils.createTempDirectory();
- ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader();
- File file = new File(classLoader.getResource("cassandra.yaml").getFile());
- File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
-
- assertTrue(tmp.createNewFile());
-
- try (
- BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
-
- //copy cassandra.yaml; inject absolute paths into cassandra.yaml
- Scanner scanner = new Scanner(file);
- ) {
- while (scanner.hasNextLine()) {
- String line = scanner.nextLine();
- line = line.replace("$PATH", "'" + tmp.getParentFile());
- b.write(line + "\n");
- b.flush();
- }
- }
-
-
- // Tell cassandra where the configuration files are.
- // Use the test configuration file.
- System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
-
- if (EMBEDDED) {
- cassandra = new EmbeddedCassandraService();
- cassandra.start();
- }
-
- try {
- Thread.sleep(1000 * 10);
- } catch (InterruptedException e) { //give cassandra a few seconds to start up
- }
-
- cluster = builder.getCluster();
- session = cluster.connect();
-
- session.execute(CREATE_KEYSPACE_QUERY);
- session.execute(CREATE_TABLE_QUERY);
- }
-
- @BeforeClass
- public static void startFlink() throws Exception {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-
- flinkCluster = new LocalFlinkMiniCluster(config);
- flinkCluster.start();
- }
-
- @AfterClass
- public static void stopFlink() {
- if (flinkCluster != null) {
- flinkCluster.stop();
- flinkCluster = null;
- }
- }
-
- @AfterClass
- public static void closeCassandra() {
- if (session != null) {
- session.executeAsync(DROP_KEYSPACE_QUERY);
- session.close();
- }
-
- if (cluster != null) {
- cluster.close();
- }
-
- if (cassandra != null) {
- cassandra.stop();
- }
-
- if (tmpDir != null) {
- //noinspection ResultOfMethodCallIgnored
- tmpDir.delete();
- }
- }
-
- // ------------------------------------------------------------------------
- // Test preparation & cleanup
- // ------------------------------------------------------------------------
-
- @Before
- public void initializeExecutionEnvironment() {
- TestStreamEnvironment.setAsContext(flinkCluster, 4);
- new TestEnvironment(flinkCluster, 4, false).setAsContext();
- }
-
- @After
- public void deleteSchema() throws Exception {
- session.executeAsync(CLEAR_TABLE_QUERY);
- }
-
- // ------------------------------------------------------------------------
- // Exactly-once Tests
- // ------------------------------------------------------------------------
-
- @Override
- protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception {
- return new CassandraTupleWriteAheadSink<>(
- INSERT_DATA_QUERY,
- TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
- builder,
- new CassandraCommitter(builder));
- }
-
- @Override
- protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() {
- return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class);
- }
-
- @Override
- protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
- return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
- }
-
- @Override
- protected void verifyResultsIdealCircumstances(
- OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
- CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
-
- ResultSet result = session.execute(SELECT_DATA_QUERY);
- ArrayList<Integer> list = new ArrayList<>();
- for (int x = 1; x <= 60; x++) {
- list.add(x);
- }
-
- for (Row s : result) {
- list.remove(new Integer(s.getInt("counter")));
- }
- Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
- }
-
- @Override
- protected void verifyResultsDataPersistenceUponMissedNotify(
- OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
- CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
-
- ResultSet result = session.execute(SELECT_DATA_QUERY);
- ArrayList<Integer> list = new ArrayList<>();
- for (int x = 1; x <= 60; x++) {
- list.add(x);
- }
-
- for (Row s : result) {
- list.remove(new Integer(s.getInt("counter")));
- }
- Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
- }
-
- @Override
- protected void verifyResultsDataDiscardingUponRestore(
- OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
- CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
-
- ResultSet result = session.execute(SELECT_DATA_QUERY);
- ArrayList<Integer> list = new ArrayList<>();
- for (int x = 1; x <= 20; x++) {
- list.add(x);
- }
- for (int x = 41; x <= 60; x++) {
- list.add(x);
- }
-
- for (Row s : result) {
- list.remove(new Integer(s.getInt("counter")));
- }
- Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
- }
-
- @Test
- public void testCassandraCommitter() throws Exception {
- CassandraCommitter cc1 = new CassandraCommitter(builder);
- cc1.setJobId("job");
- cc1.setOperatorId("operator");
-
- CassandraCommitter cc2 = new CassandraCommitter(builder);
- cc2.setJobId("job");
- cc2.setOperatorId("operator");
-
- CassandraCommitter cc3 = new CassandraCommitter(builder);
- cc3.setJobId("job");
- cc3.setOperatorId("operator1");
-
- cc1.createResource();
-
- cc1.open();
- cc2.open();
- cc3.open();
-
- Assert.assertFalse(cc1.isCheckpointCommitted(0, 1));
- Assert.assertFalse(cc2.isCheckpointCommitted(1, 1));
- Assert.assertFalse(cc3.isCheckpointCommitted(0, 1));
-
- cc1.commitCheckpoint(0, 1);
- Assert.assertTrue(cc1.isCheckpointCommitted(0, 1));
- //verify that other sub-tasks aren't affected
- Assert.assertFalse(cc2.isCheckpointCommitted(1, 1));
- //verify that other tasks aren't affected
- Assert.assertFalse(cc3.isCheckpointCommitted(0, 1));
-
- Assert.assertFalse(cc1.isCheckpointCommitted(0, 2));
-
- cc1.close();
- cc2.close();
- cc3.close();
-
- cc1 = new CassandraCommitter(builder);
- cc1.setJobId("job");
- cc1.setOperatorId("operator");
-
- cc1.open();
-
- //verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data
- Assert.assertTrue(cc1.isCheckpointCommitted(0, 1));
- Assert.assertFalse(cc1.isCheckpointCommitted(0, 2));
-
- cc1.close();
- }
-
- // ------------------------------------------------------------------------
- // At-least-once Tests
- // ------------------------------------------------------------------------
-
- @Test
- public void testCassandraTupleAtLeastOnceSink() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection);
- source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
-
- env.execute();
-
- ResultSet rs = session.execute(SELECT_DATA_QUERY);
- Assert.assertEquals(20, rs.all().size());
- }
-
- @Test
- public void testCassandraPojoAtLeastOnceSink() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStreamSource<Pojo> source = env
- .addSource(new SourceFunction<Pojo>() {
-
- private boolean running = true;
- private volatile int cnt = 0;
-
- @Override
- public void run(SourceContext<Pojo> ctx) throws Exception {
- while (running) {
- ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0));
- cnt++;
- if (cnt == 20) {
- cancel();
- }
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
-
- env.execute();
-
- ResultSet rs = session.execute(SELECT_DATA_QUERY);
- Assert.assertEquals(20, rs.all().size());
- }
-
- @Test
- public void testCassandraBatchFormats() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection);
- dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
-
- env.execute("Write data");
-
- DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput(
- new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder),
- TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){}));
-
-
- long count = inputDS.count();
- Assert.assertEquals(count, 20L);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
deleted file mode 100644
index 847d1a0..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.Collections;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
-
-public class CassandraTupleWriteAheadSinkTest {
-
- @Test(timeout=20000)
- public void testAckLoopExitOnException() throws Exception {
- final AtomicReference<Runnable> runnableFuture = new AtomicReference<>();
-
- final ClusterBuilder clusterBuilder = new ClusterBuilder() {
- private static final long serialVersionUID = 4624400760492936756L;
-
- @Override
- protected Cluster buildCluster(Cluster.Builder builder) {
- try {
- BoundStatement boundStatement = mock(BoundStatement.class);
- when(boundStatement.setDefaultTimestamp(any(long.class))).thenReturn(boundStatement);
-
- PreparedStatement preparedStatement = mock(PreparedStatement.class);
- when(preparedStatement.bind(Matchers.anyVararg())).thenReturn(boundStatement);
-
- ResultSetFuture future = mock(ResultSetFuture.class);
- when(future.get()).thenThrow(new RuntimeException("Expected exception."));
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- synchronized (runnableFuture) {
- runnableFuture.set((((Runnable) invocationOnMock.getArguments()[0])));
- runnableFuture.notifyAll();
- }
- return null;
- }
- }).when(future).addListener(any(Runnable.class), any(Executor.class));
-
- Session session = mock(Session.class);
- when(session.prepare(anyString())).thenReturn(preparedStatement);
- when(session.executeAsync(any(BoundStatement.class))).thenReturn(future);
-
- Cluster cluster = mock(Cluster.class);
- when(cluster.connect()).thenReturn(session);
- return cluster;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
-
- // Our asynchronous executor thread
- new Thread(new Runnable() {
- @Override
- public void run() {
- synchronized (runnableFuture) {
- while (runnableFuture.get() == null) {
- try {
- runnableFuture.wait();
- } catch (InterruptedException e) {
- // ignore interrupts
- }
- }
- }
- runnableFuture.get().run();
- }
- }).start();
-
- CheckpointCommitter cc = mock(CheckpointCommitter.class);
- final CassandraTupleWriteAheadSink<Tuple0> sink = new CassandraTupleWriteAheadSink<>(
- "abc",
- TupleTypeInfo.of(Tuple0.class).createSerializer(new ExecutionConfig()),
- clusterBuilder,
- cc
- );
-
- OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new OneInputStreamOperatorTestHarness(sink);
- harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", true);
-
- harness.setup();
- sink.open();
-
- // we should leave the loop and return false since we've seen an exception
- assertFalse(sink.sendValues(Collections.singleton(new Tuple0()), 0L));
-
- sink.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
deleted file mode 100644
index 9b331d6..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.mapping.annotations.Column;
-import com.datastax.driver.mapping.annotations.Table;
-
-import java.io.Serializable;
-
-@Table(keyspace = "flink", name = "test")
-public class Pojo implements Serializable {
-
- private static final long serialVersionUID = 1038054554690916991L;
-
- @Column(name = "id")
- private String id;
- @Column(name = "counter")
- private int counter;
- @Column(name = "batch_id")
- private int batch_id;
-
- public Pojo(String id, int counter, int batch_id) {
- this.id = id;
- this.counter = counter;
- this.batch_id = batch_id;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public int getCounter() {
- return counter;
- }
-
- public void setCounter(int counter) {
- this.counter = counter;
- }
-
- public int getBatch_id() {
- return batch_id;
- }
-
- public void setBatch_id(int batch_id) {
- this.batch_id = batch_id;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
deleted file mode 100644
index e1bcea9..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import java.util.ArrayList;
-
-/**
- * This is an example showing the to use the Pojo Cassandra Sink in the Streaming API.
- *
- * Pojo's have to be annotated with datastax annotations to work with this sink.
- *
- * The example assumes that a table exists in a local cassandra database, according to the following query:
- * CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY)
- */
-public class CassandraPojoSinkExample {
- private static final ArrayList<Message> messages = new ArrayList<>(20);
-
- static {
- for (long i = 0; i < 20; i++) {
- messages.add(new Message("cassandra-" + i));
- }
- }
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Message> source = env.fromCollection(messages);
-
- CassandraSink.addSink(source)
- .setClusterBuilder(new ClusterBuilder() {
- @Override
- protected Cluster buildCluster(Builder builder) {
- return builder.addContactPoint("127.0.0.1").build();
- }
- })
- .build();
-
- env.execute("Cassandra Sink example");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
deleted file mode 100644
index c6345df..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import java.util.ArrayList;
-
-/**
- * This is an example showing the to use the Tuple Cassandra Sink in the Streaming API.
- *
- * The example assumes that a table exists in a local cassandra database, according to the following query:
- * CREATE TABLE IF NOT EXISTS test.writetuple(element1 text PRIMARY KEY, element2 int)
- */
-public class CassandraTupleSinkExample {
- private static final String INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)";
- private static final ArrayList<Tuple2<String, Integer>> collection = new ArrayList<>(20);
-
- static {
- for (int i = 0; i < 20; i++) {
- collection.add(new Tuple2<>("cassandra-" + i, i));
- }
- }
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);
-
- CassandraSink.addSink(source)
- .setQuery(INSERT)
- .setClusterBuilder(new ClusterBuilder() {
- @Override
- protected Cluster buildCluster(Builder builder) {
- return builder.addContactPoint("127.0.0.1").build();
- }
- })
- .build();
-
- env.execute("WriteTupleIntoCassandra");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
deleted file mode 100644
index 811c410..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import com.datastax.driver.core.Cluster;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import java.util.UUID;
-
-/**
- * This is an example showing the to use the Cassandra Sink (with write-ahead log) in the Streaming API.
- *
- * The example assumes that a table exists in a local cassandra database, according to the following query:
- * CREATE TABLE example.values (id text, count int, PRIMARY KEY(id));
- *
- * Important things to note are that checkpointing is enabled, a StateBackend is set and the enableWriteAheadLog() call
- * when creating the CassandraSink.
- */
-public class CassandraTupleWriteAheadSinkExample {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
- env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend"));
-
- CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource()))
- .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
- .enableWriteAheadLog()
- .setClusterBuilder(new ClusterBuilder() {
- @Override
- public Cluster buildCluster(Cluster.Builder builder) {
- return builder.addContactPoint("127.0.0.1").build();
- }
- })
- .build();
-
- sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello");
-
- env.execute();
- }
-
- public static class MySource implements SourceFunction<Tuple2<String, Integer>>, Checkpointed<Integer> {
- private int counter = 0;
- private boolean stop = false;
-
- @Override
- public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
- while (!stop) {
- Thread.sleep(50);
- ctx.collect(new Tuple2<>("" + UUID.randomUUID(), 1));
- counter++;
- if (counter == 100) {
- stop = true;
- }
- }
- }
-
- @Override
- public void cancel() {
- stop = true;
- }
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return counter;
- }
-
- @Override
- public void restoreState(Integer state) throws Exception {
- this.counter = state;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
deleted file mode 100644
index 7524d95..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import com.datastax.driver.mapping.annotations.Column;
-import com.datastax.driver.mapping.annotations.Table;
-
-import java.io.Serializable;
-
-@Table(keyspace = "test", name = "message")
-public class Message implements Serializable {
-
- private static final long serialVersionUID = 1123119384361005680L;
-
- @Column(name = "body")
- private String message;
-
- public Message(String word) {
- this.message = word;
- }
-
- public String getMessage() {
- return message;
- }
-
- public void setMessage(String word) {
- this.message = word;
- }
-
- public boolean equals(Object other) {
- if (other instanceof Message) {
- Message that = (Message) other;
- return this.message.equals(that.message);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return message.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
deleted file mode 100644
index 0594ea3..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
+++ /dev/null
@@ -1,43 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-cluster_name: 'Test Cluster'
-commitlog_sync: 'periodic'
-commitlog_sync_period_in_ms: 10000
-commitlog_segment_size_in_mb: 16
-partitioner: 'org.apache.cassandra.dht.RandomPartitioner'
-endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch'
-commitlog_directory: $PATH/commit'
-data_file_directories:
- - $PATH/data'
-saved_caches_directory: $PATH/cache'
-listen_address: '127.0.0.1'
-seed_provider:
- - class_name: 'org.apache.cassandra.locator.SimpleSeedProvider'
- parameters:
- - seeds: '127.0.0.1'
-native_transport_port: 9042
-
-concurrent_reads: 8
-concurrent_writes: 8
-
-auto_bootstrap: false
-auto_snapshot: false
-
-start_rpc: false
-start_native_transport: true
-native_transport_max_threads: 8
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
deleted file mode 100644
index a43d556..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target= System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml b/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml
deleted file mode 100644
index c5ba3d8..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml
+++ /dev/null
@@ -1,90 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-elasticsearch_2.10</artifactId>
- <name>flink-connector-elasticsearch</name>
-
- <packaging>jar</packaging>
-
- <!-- Allow users to pass custom connector versions -->
- <properties>
- <elasticsearch.version>1.7.1</elasticsearch.version>
- </properties>
-
- <dependencies>
-
- <!-- core dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
-
- <!-- test dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <rerunFailingTestsCount>3</rerunFailingTestsCount>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
deleted file mode 100644
index ac14ade..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch;
-
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.collect.ImmutableList;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.node.Node;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
-
-
-/**
- * Sink that emits its input elements to an Elasticsearch cluster.
- *
- * <p>
- * When using the first constructor {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)}
- * the sink will create a local {@link Node} for communicating with the
- * Elasticsearch cluster. When using the second constructor
- * {@link #ElasticsearchSink(java.util.Map, java.util.List, IndexRequestBuilder)} a {@link TransportClient} will
- * be used instead.
- *
- * <p>
- * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
- * can be connected to. With the {@code Node Client} the sink will block and wait for a cluster
- * to come online.
- *
- * <p>
- * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
- * the {@link Node} or {@link TransportClient}. The config keys can be found in the Elasticsearch
- * documentation. An important setting is {@code cluster.name}, this should be set to the name
- * of the cluster that the sink should emit to.
- *
- * <p>
- * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
- * This will buffer elements before sending a request to the cluster. The behaviour of the
- * {@code BulkProcessor} can be configured using these config keys:
- * <ul>
- * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
- * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
- * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
- * settings in milliseconds
- * </ul>
- *
- * <p>
- * You also have to provide an {@link IndexRequestBuilder}. This is used to create an
- * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
- * {@link org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder} for an example.
- *
- * @param <T> Type of the elements emitted by this sink
- */
-public class ElasticsearchSink<T> extends RichSinkFunction<T> {
-
- public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
- public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
- public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
-
- /**
- * The user specified config map that we forward to Elasticsearch when we create the Client.
- */
- private final Map<String, String> userConfig;
-
- /**
- * The list of nodes that the TransportClient should connect to. This is null if we are using
- * an embedded Node to get a Client.
- */
- private final List<TransportAddress> transportNodes;
-
- /**
- * The builder that is used to construct an {@link IndexRequest} from the incoming element.
- */
- private final IndexRequestBuilder<T> indexRequestBuilder;
-
- /**
- * The embedded Node that is used to communicate with the Elasticsearch cluster. This is null
- * if we are using a TransportClient.
- */
- private transient Node node;
-
- /**
- * The Client that was either retrieved from a Node or is a TransportClient.
- */
- private transient Client client;
-
- /**
- * Bulk processor that was created using the client
- */
- private transient BulkProcessor bulkProcessor;
-
- /**
- * This is set from inside the BulkProcessor listener if there where failures in processing.
- */
- private final AtomicBoolean hasFailure = new AtomicBoolean(false);
-
- /**
- * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
- */
- private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
-
- /**
- * Creates a new ElasticsearchSink that connects to the cluster using an embedded Node.
- *
- * @param userConfig The map of user settings that are passed when constructing the Node and BulkProcessor
- * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element
- */
- public ElasticsearchSink(Map<String, String> userConfig, IndexRequestBuilder<T> indexRequestBuilder) {
- this.userConfig = userConfig;
- this.indexRequestBuilder = indexRequestBuilder;
- transportNodes = null;
- }
-
- /**
- * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient.
- *
- * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor
- * @param transportNodes The Elasticsearch Nodes to which to connect using a {@code TransportClient}
- * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element
- *
- */
- public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportNodes, IndexRequestBuilder<T> indexRequestBuilder) {
- this.userConfig = userConfig;
- this.indexRequestBuilder = indexRequestBuilder;
- this.transportNodes = transportNodes;
- }
-
- /**
- * Initializes the connection to Elasticsearch by either creating an embedded
- * {@link org.elasticsearch.node.Node} and retrieving the
- * {@link org.elasticsearch.client.Client} from it or by creating a
- * {@link org.elasticsearch.client.transport.TransportClient}.
- */
- @Override
- public void open(Configuration configuration) {
- if (transportNodes == null) {
- // Make sure that we disable http access to our embedded node
- Settings settings =
- ImmutableSettings.settingsBuilder()
- .put(userConfig)
- .put("http.enabled", false)
- .build();
-
- node =
- nodeBuilder()
- .settings(settings)
- .client(true)
- .data(false)
- .node();
-
- client = node.client();
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Created Elasticsearch Client {} from embedded Node", client);
- }
-
- } else {
- Settings settings = ImmutableSettings.settingsBuilder()
- .put(userConfig)
- .build();
-
- TransportClient transportClient = new TransportClient(settings);
- for (TransportAddress transport: transportNodes) {
- transportClient.addTransportAddress(transport);
- }
-
- // verify that we actually are connected to a cluster
- ImmutableList<DiscoveryNode> nodes = transportClient.connectedNodes();
- if (nodes.isEmpty()) {
- throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connected to nodes: " + nodes.toString());
- }
- }
-
- client = transportClient;
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Created Elasticsearch TransportClient {}", client);
- }
- }
-
- BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
- client,
- new BulkProcessor.Listener() {
- @Override
- public void beforeBulk(long executionId,
- BulkRequest request) {
-
- }
-
- @Override
- public void afterBulk(long executionId,
- BulkRequest request,
- BulkResponse response) {
- if (response.hasFailures()) {
- for (BulkItemResponse itemResp : response.getItems()) {
- if (itemResp.isFailed()) {
- LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
- failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
- }
- }
- hasFailure.set(true);
- }
- }
-
- @Override
- public void afterBulk(long executionId,
- BulkRequest request,
- Throwable failure) {
- LOG.error(failure.getMessage());
- failureThrowable.compareAndSet(null, failure);
- hasFailure.set(true);
- }
- });
-
- // This makes flush() blocking
- bulkProcessorBuilder.setConcurrentRequests(0);
-
- ParameterTool params = ParameterTool.fromMap(userConfig);
-
- if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
- bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
- }
-
- if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
- bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
- CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
- }
-
- if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
- bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
- }
-
- bulkProcessor = bulkProcessorBuilder.build();
- }
-
- @Override
- public void invoke(T element) {
- IndexRequest indexRequest = indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Emitting IndexRequest: {}", indexRequest);
- }
-
- bulkProcessor.add(indexRequest);
- }
-
- @Override
- public void close() {
- if (bulkProcessor != null) {
- bulkProcessor.close();
- bulkProcessor = null;
- }
-
- if (client != null) {
- client.close();
- }
-
- if (node != null) {
- node.close();
- }
-
- if (hasFailure.get()) {
- Throwable cause = failureThrowable.get();
- if (cause != null) {
- throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
- } else {
- throw new RuntimeException("An error occured in ElasticsearchSink.");
-
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
deleted file mode 100644
index 04ae40a..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.elasticsearch.action.index.IndexRequest;
-
-import java.io.Serializable;
-
-/**
- * Function that creates an {@link IndexRequest} from an element in a Stream.
- *
- * <p>
- * This is used by {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink}
- * to prepare elements for sending them to Elasticsearch. See
- * <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index_.html">Index API</a>
- * for information about how to format data for adding it to an Elasticsearch index.
- *
- * <p>
- * Example:
- *
- * <pre>{@code
- * private static class MyIndexRequestBuilder implements IndexRequestBuilder<String> {
- *
- * public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
- * Map<String, Object> json = new HashMap<>();
- * json.put("data", element);
- *
- * return Requests.indexRequest()
- * .index("my-index")
- * .type("my-type")
- * .source(json);
- * }
- * }
- * }</pre>
- *
- * @param <T> The type of the element handled by this {@code IndexRequestBuilder}
- */
-public interface IndexRequestBuilder<T> extends Function, Serializable {
-
- /**
- * Creates an {@link org.elasticsearch.action.index.IndexRequest} from an element.
- *
- * @param element The element that needs to be turned in to an {@code IndexRequest}
- * @param ctx The Flink {@link RuntimeContext} of the {@link ElasticsearchSink}
- *
- * @return The constructed {@code IndexRequest}
- */
- IndexRequest createIndexRequest(T element, RuntimeContext ctx);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
deleted file mode 100644
index 33a2e47..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.transport.LocalTransportAddress;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.node.Node;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
-
-public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase {
-
- private static final int NUM_ELEMENTS = 20;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Test
- public void testNodeClient() throws Exception{
-
- File dataDir = tempFolder.newFolder();
-
- Node node = nodeBuilder()
- .settings(ImmutableSettings.settingsBuilder()
- .put("http.enabled", false)
- .put("path.data", dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that user config works correctly
- .clusterName("my-node-client-cluster")
- .local(true)
- .node();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
- Map<String, String> config = Maps.newHashMap();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-node-client-cluster");
-
- // connect to our local node
- config.put("node.local", "true");
-
- source.addSink(new ElasticsearchSink<>(config, new TestIndexRequestBuilder()));
-
- env.execute("Elasticsearch Node Client Test");
-
-
- // verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new GetRequest("my-index",
- "my-type",
- Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i, response.getSource().get("data"));
- }
-
- node.close();
- }
-
- @Test
- public void testTransportClient() throws Exception {
-
- File dataDir = tempFolder.newFolder();
-
- Node node = nodeBuilder()
- .settings(ImmutableSettings.settingsBuilder()
- .put("http.enabled", false)
- .put("path.data", dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that user config works correctly
- .clusterName("my-node-client-cluster")
- .local(true)
- .node();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
- Map<String, String> config = Maps.newHashMap();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-node-client-cluster");
-
- // connect to our local node
- config.put("node.local", "true");
-
- List<TransportAddress> transports = Lists.newArrayList();
- transports.add(new LocalTransportAddress("1"));
-
- source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder()));
-
- env.execute("Elasticsearch TransportClient Test");
-
-
- // verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new GetRequest("my-index",
- "my-type",
- Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i, response.getSource().get("data"));
- }
-
- node.close();
- }
-
- @Test(expected = JobExecutionException.class)
- public void testTransportClientFails() throws Exception{
- // this checks whether the TransportClient fails early when there is no cluster to
- // connect to. We don't hava such as test for the Node Client version since that
- // one will block and wait for a cluster to come online
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
- Map<String, String> config = Maps.newHashMap();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-node-client-cluster");
-
- // connect to our local node
- config.put("node.local", "true");
-
- List<TransportAddress> transports = Lists.newArrayList();
- transports.add(new LocalTransportAddress("1"));
-
- source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder()));
-
- env.execute("Elasticsearch Node Client Test");
- }
-
- private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
- for (int i = 0; i < NUM_ELEMENTS && running; i++) {
- ctx.collect(Tuple2.of(i, "message #" + i));
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
-
- private static class TestIndexRequestBuilder implements IndexRequestBuilder<Tuple2<Integer, String>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public IndexRequest createIndexRequest(Tuple2<Integer, String> element, RuntimeContext ctx) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element.f1);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .id(element.f0.toString())
- .source(json);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
deleted file mode 100644
index 136ae77..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch.examples;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the cluster name in the config map.
- */
-public class ElasticsearchExample {
-
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- for (int i = 0; i < 20 && running; i++) {
- ctx.collect("message #" + i);
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
- source.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
- @Override
- public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json);
- }
- }));
-
-
- env.execute("Elasticsearch Example");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
deleted file mode 100644
index dc20726..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- a/flink-streaming-connectors/flink-connector-elasticsearch/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
- <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file