You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mcfongtw <gi...@git.apache.org> on 2017/08/27 01:16:54 UTC

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

GitHub user mcfongtw opened a pull request:

    https://github.com/apache/flink/pull/4605

    [FLINK-4500] [C* Connector] CassandraSinkBase implements CheckpointedFunction

    ## What is the purpose of the change
    Have CassandraSinkBase to implement CheckpointedFunction so that all in-flight mutation message could be sent to C* sink before a checkpoint performs. As a result, the checkpoint would be complete. 
    
    ## Brief change log
    
    * Implement CheckpointedFunction to (optionally) wait on all pending records being flushed to the C* sink before checkpoint performs (or closing connection).
    * Add debugging message in CassandraSinkBase.
    * Add unit tests for simple / multi-threaded message dispatching for successful / failed scenarios
    * Add unit tests for failure handling logics on errors thrown at different stages.
    * Add unit tests for flushing pending records when checkpoint performs.
    * Provide a Immediate / Delayed type of ResultSetFuture for testing purposes.
    * Add CassandraBaseTest in suppression list to use guava imports
    * In log4j-test settings, change root log level to INFO and enable ALL level against some test classes.
    
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as *CassandraBaseTest*.
    
    This change added tests and can be verified as follows:
    
    * Add unit tests for simple / multi-threaded message dispatching for successful / failed scenarios
    * Add unit tests for failure handling logics on errors thrown at different stages.
    * Add unit tests for flushing pending records when checkpoint performs.
    * Provide a Immediate / Delayed type of ResultSetFuture for testing purposes.
    * Add CassandraBaseTest in suppression list to use guava imports
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no** (maybe) )
      - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mcfongtw/flink FLINK-4500

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4605.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4605
    
----
commit caefe390bf2aaa22d996cc24a31a3ba76241fb23
Author: Michael Fong <mc...@gmail.com>
Date:   2017-08-14T12:57:06Z

    [FLINK-4500] CassandraSinkBase implements CheckpointedFunction
    
    * Implement CheckpointedFunction to (optionally) wait on all pending records being flushed to the C* sink before taking a snapshot (or closing connection).
    
    * Add debugging message in CassandraSinkBase.
    
    * Add unit tests for simple / multi-threaded message dispatching for successful / failed scenarios
    
    * Add unit tests for failure handling logics on errors thrown at different stages.
    
    * Add unit tests for flushing pending records when checkpoint performs.
    
    * Provide a Immediate / Delayed type of ResultSetFuture for testing purposes.
    
    * Add CassandraBaseTest in suppression list to use guava imports
    
    * In log4j-test settings, change root log level to INFO and enable ALL level against some test classes.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139950435
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnClose() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +
    +			casSinkFunc.close();
    +		} catch (IOException e) {
    +			//expected async error from close()
    +
    +			Assert.assertTrue(e.getMessage().contains("Error while sending value"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    --- End diff --
    
    That is right. Thanks.


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139658146
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---
    @@ -37,29 +41,49 @@
      *
      * @param <IN> Type of the elements emitted by this sink
      */
    -public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
    +public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction {
     	protected final Logger log = LoggerFactory.getLogger(getClass());
     	protected transient Cluster cluster;
     	protected transient Session session;
     
    -	protected transient volatile Throwable exception;
    +	protected transient volatile Throwable asyncError;
     	protected transient FutureCallback<V> callback;
     
    -	private final ClusterBuilder builder;
    +	protected final ClusterBuilder builder;
     
     	private final AtomicInteger updatesPending = new AtomicInteger();
     
    +	/**
    +	 * If true, the producer will wait until all outstanding action requests have been sent to C*.
    +	 */
    +	private boolean flushOnCheckpoint = true;
    --- End diff --
    
    Got your points on the behaviors to subclasses to CassandraSinkBase, but I was wondering if some of these 'common behavior' should be or would be unified across different data sink, KafKa ES, etc.  They all currently have slightly different implantation though. 


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139482886
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnClose() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +
    +			casSinkFunc.close();
    +		} catch (IOException e) {
    +			//expected async error from close()
    +
    +			Assert.assertTrue(e.getMessage().contains("Error while sending value"));
    +
    +			//Final pending updates should be zero
    --- End diff --
    
    unnecessary comment  (also applies to other tests)


---

[GitHub] flink issue #4605: [FLINK-4500] [C* Connector] CassandraSinkBase implements ...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on the issue:

    https://github.com/apache/flink/pull/4605
  
    Hi, @zentol , thanks for reviewing this PR. I recalled that I put a [caveat ](https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html#checkpointing-and-fault-tolerance)about this potential data loss in the latest C* connector documents. Since this fix is committed, would you like me to open another PR just to remove that warning message from document?


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139482849
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnClose() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +
    +			casSinkFunc.close();
    +		} catch (IOException e) {
    +			//expected async error from close()
    +
    +			Assert.assertTrue(e.getMessage().contains("Error while sending value"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    --- End diff --
    
    we usually put the fail within the ```try``` block  (also applies to other tests)


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139482639
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    --- End diff --
    
    empty method that can be removed


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139484838
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnClose() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +
    +			casSinkFunc.close();
    +		} catch (IOException e) {
    +			//expected async error from close()
    +
    +			Assert.assertTrue(e.getMessage().contains("Error while sending value"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on invoke() if previously message delivery failed.
    +	 */
    +	//TODO: should unitfy error handling logic in CassandraSinkBase
    +	//Exception would have been thrown from invoke(), but asyncError was not set null, hence it was rethrown in close()
    +	@Ignore
    +	@Test
    +	public void testAsyncErrorThrownOnInvoke() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +			casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +		} catch (IOException e) {
    +			//expected async error thrown from invoke()
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			casSinkFunc.close();
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown when checkpoint performs if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnCheckpoint() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
    +
    +		testHarness.open();
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			//expected async error from snapshotState()
    +
    +			Assert.assertTrue(e.getCause() instanceof IllegalStateException);
    +			Assert.assertTrue(e.getCause().getMessage().contains("Failed to send data to Cassandra"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			casSinkFunc.close();
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	///////////////////////
    +	// Multi Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures that CassandraSinkBase would flush all in-flight message on close(), accompanied with concurrent
    +	 * message delivery successfully via a thraedpool.
    +	 */
    +	@Test
    +	public void testFlushOnPendingRecordsOnCloseWithSuccessfulMessage() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +
    +		ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		for (int i = 0; i < MAX_THREAD_NUM; i++) {
    +			threadPool.submit(() -> {
    +				try {
    +
    +					casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.DELAYED_SUCCESS);
    +				} catch (Exception e) {
    +					LOG.error("Error while dispatching sending message to Cassandra sink => {} ", e);
    +				}
    +			});
    +		}
    +
    +		//wait until the first message has been dispatched and invoked
    +		Thread.sleep(500);
    +
    +		casSinkFunc.close();
    +
    +		threadPool.shutdown();
    +		threadPool.awaitTermination(10, TimeUnit.SECONDS);
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that CassandraSinkBase would flush all in-flight message when checkpoint performs, accompanied
    +	 * with concurrent message delivery successfully via a thraedpool.
    +	 */
    +	@Test
    +	public void testFlushOnPendingRecordsOnCheckpoint() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +
    +		ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +				new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
    +
    +		testHarness.open();
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		for (int i = 0; i < MAX_THREAD_NUM; i++) {
    +			threadPool.submit(() -> {
    +				try {
    +
    +					casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.DELAYED_SUCCESS);
    +				} catch (Exception e) {
    +					LOG.error("Error while dispatching sending message to Cassandra sink => {} ", e);
    +				}
    +			});
    +			Thread.sleep(500);
    +		}
    +
    +		//wait until the first message has been dispatched and invoked
    +		Thread.sleep(500);
    +
    +		testHarness.snapshot(123L, 123L);
    +
    +		threadPool.shutdown();
    +		threadPool.awaitTermination(10, TimeUnit.SECONDS);
    +
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +		casSinkFunc.close();
    +	}
    +
    +	/**
    +	 * Test ensures that CassandraSinkBase would NOT flush all in-flight message when checkpoint performs.
    +	 */
    +	@Test
    +	public void testDoNotFlushOnPendingRecordsOnCheckpoint() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +
    +		ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(false);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
    +
    +		testHarness.open();
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		for (int i = 0; i < MAX_THREAD_NUM; i++) {
    +			threadPool.submit(() -> {
    +				try {
    +
    +					casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.DELAYED_SUCCESS);
    +				} catch (Exception e) {
    +					LOG.error("Error while dispatching sending message to Cassandra sink => {} ", e);
    +				}
    +			});
    +			Thread.sleep(500);
    +		}
    +
    +		//wait until the first message has been dispatched and invoked
    +		Thread.sleep(500);
    +
    +		testHarness.snapshot(123L, 123L);
    +		//Final pending records # > 0
    +		Assert.assertTrue(casSinkFunc.getNumOfPendingRecords() > 0);
    +
    +		threadPool.shutdown();
    +		threadPool.awaitTermination(10, TimeUnit.SECONDS);
    +
    +		casSinkFunc.close();
    +	}
    +
    +	/**
    +	 * Test ensures that CassandraSinkBase would flush all in-flight message on close(), accompanied with concurrent
    +	 * thread dispatched message failure via a thraedpool.
    --- End diff --
    
    typo: thraedpool -> threadpool


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139485608
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    --- End diff --
    
    remove this line for clarity purposes as close() should never be called


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139660015
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---
    @@ -37,29 +41,49 @@
      *
      * @param <IN> Type of the elements emitted by this sink
      */
    -public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
    +public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction {
     	protected final Logger log = LoggerFactory.getLogger(getClass());
     	protected transient Cluster cluster;
     	protected transient Session session;
     
    -	protected transient volatile Throwable exception;
    +	protected transient volatile Throwable asyncError;
     	protected transient FutureCallback<V> callback;
     
    -	private final ClusterBuilder builder;
    +	protected final ClusterBuilder builder;
     
     	private final AtomicInteger updatesPending = new AtomicInteger();
     
    +	/**
    +	 * If true, the producer will wait until all outstanding action requests have been sent to C*.
    +	 */
    +	private boolean flushOnCheckpoint = true;
    --- End diff --
    
    Ideally all sinks should behave the same way, but that would be out of scope for PR in particular.


---

[GitHub] flink issue #4605: [FLINK-4500] [C* Connector] CassandraSinkBase implements ...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on the issue:

    https://github.com/apache/flink/pull/4605
  
    @zentol, could you also take another look if the revision has addressed the concerns? Thank you.


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139685528
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnClose() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +
    +			casSinkFunc.close();
    +		} catch (IOException e) {
    +			//expected async error from close()
    +
    +			Assert.assertTrue(e.getMessage().contains("Error while sending value"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on invoke() if previously message delivery failed.
    +	 */
    +	//TODO: should unitfy error handling logic in CassandraSinkBase
    +	//Exception would have been thrown from invoke(), but asyncError was not set null, hence it was rethrown in close()
    +	@Ignore
    +	@Test
    +	public void testAsyncErrorThrownOnInvoke() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +			casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +		} catch (IOException e) {
    +			//expected async error thrown from invoke()
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			casSinkFunc.close();
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown when checkpoint performs if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnCheckpoint() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
    +
    +		testHarness.open();
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			//expected async error from snapshotState()
    +
    +			Assert.assertTrue(e.getCause() instanceof IllegalStateException);
    +			Assert.assertTrue(e.getCause().getMessage().contains("Failed to send data to Cassandra"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			casSinkFunc.close();
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	///////////////////////
    +	// Multi Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures that CassandraSinkBase would flush all in-flight message on close(), accompanied with concurrent
    +	 * message delivery successfully via a thraedpool.
    +	 */
    +	@Test
    +	public void testFlushOnPendingRecordsOnCloseWithSuccessfulMessage() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +
    +		ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		for (int i = 0; i < MAX_THREAD_NUM; i++) {
    +			threadPool.submit(() -> {
    +				try {
    +
    +					casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.DELAYED_SUCCESS);
    +				} catch (Exception e) {
    +					LOG.error("Error while dispatching sending message to Cassandra sink => {} ", e);
    +				}
    +			});
    +		}
    +
    +		//wait until the first message has been dispatched and invoked
    +		Thread.sleep(500);
    --- End diff --
    
    Will rewrite these with more deterministic way, similar from the comment above. 


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139483098
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnClose() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +
    +			casSinkFunc.close();
    +		} catch (IOException e) {
    +			//expected async error from close()
    +
    +			Assert.assertTrue(e.getMessage().contains("Error while sending value"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on invoke() if previously message delivery failed.
    +	 */
    +	//TODO: should unitfy error handling logic in CassandraSinkBase
    --- End diff --
    
    this comment doesn't make sense here, if anything it should be put into `CassandraSinkBase`.


---

[GitHub] flink issue #4605: [FLINK-4500] [C* Connector] CassandraSinkBase implements ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4605
  
    I've rebased this locally (along with some cleanup) and will merge it later today.


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139484328
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnClose() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +
    +			casSinkFunc.close();
    +		} catch (IOException e) {
    +			//expected async error from close()
    +
    +			Assert.assertTrue(e.getMessage().contains("Error while sending value"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on invoke() if previously message delivery failed.
    +	 */
    +	//TODO: should unitfy error handling logic in CassandraSinkBase
    +	//Exception would have been thrown from invoke(), but asyncError was not set null, hence it was rethrown in close()
    +	@Ignore
    +	@Test
    +	public void testAsyncErrorThrownOnInvoke() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +			casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +		} catch (IOException e) {
    +			//expected async error thrown from invoke()
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			casSinkFunc.close();
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown when checkpoint performs if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnCheckpoint() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
    +
    +		testHarness.open();
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			//expected async error from snapshotState()
    +
    +			Assert.assertTrue(e.getCause() instanceof IllegalStateException);
    +			Assert.assertTrue(e.getCause().getMessage().contains("Failed to send data to Cassandra"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			casSinkFunc.close();
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	///////////////////////
    +	// Multi Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures that CassandraSinkBase would flush all in-flight message on close(), accompanied with concurrent
    +	 * message delivery successfully via a thraedpool.
    +	 */
    +	@Test
    +	public void testFlushOnPendingRecordsOnCloseWithSuccessfulMessage() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +
    +		ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		for (int i = 0; i < MAX_THREAD_NUM; i++) {
    +			threadPool.submit(() -> {
    +				try {
    +
    +					casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.DELAYED_SUCCESS);
    +				} catch (Exception e) {
    +					LOG.error("Error while dispatching sending message to Cassandra sink => {} ", e);
    +				}
    +			});
    +		}
    +
    +		//wait until the first message has been dispatched and invoked
    +		Thread.sleep(500);
    --- End diff --
    
    these waits are too long. They increase the test execution times, and are usually a sign that the test isn't really stable.


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139482771
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnClose() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +
    --- End diff --
    
    remove empty line


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139482897
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnClose() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +
    +			casSinkFunc.close();
    +		} catch (IOException e) {
    +			//expected async error from close()
    +
    +			Assert.assertTrue(e.getMessage().contains("Error while sending value"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			//done
    --- End diff --
    
    unnecessary comment (also applies to other tests)


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139482615
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    --- End diff --
    
    empty method that can be removed


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139632915
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---
    @@ -37,29 +41,49 @@
      *
      * @param <IN> Type of the elements emitted by this sink
      */
    -public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
    +public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction {
     	protected final Logger log = LoggerFactory.getLogger(getClass());
     	protected transient Cluster cluster;
     	protected transient Session session;
     
    -	protected transient volatile Throwable exception;
    +	protected transient volatile Throwable asyncError;
     	protected transient FutureCallback<V> callback;
     
    -	private final ClusterBuilder builder;
    +	protected final ClusterBuilder builder;
     
     	private final AtomicInteger updatesPending = new AtomicInteger();
     
    +	/**
    +	 * If true, the producer will wait until all outstanding action requests have been sent to C*.
    +	 */
    +	private boolean flushOnCheckpoint = true;
    --- End diff --
    
    The default should be true. Every CassandraSinkBase subclass should accept a constructor argument that is passed on to the CassandraSinkBase constructor. Additionally, the CassandraSinkBuilder should be extended to allow setting this flag to false if desired.
    



---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139630504
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---
    @@ -37,29 +41,49 @@
      *
      * @param <IN> Type of the elements emitted by this sink
      */
    -public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
    +public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction {
     	protected final Logger log = LoggerFactory.getLogger(getClass());
     	protected transient Cluster cluster;
     	protected transient Session session;
     
    -	protected transient volatile Throwable exception;
    +	protected transient volatile Throwable asyncError;
     	protected transient FutureCallback<V> callback;
     
    -	private final ClusterBuilder builder;
    +	protected final ClusterBuilder builder;
     
     	private final AtomicInteger updatesPending = new AtomicInteger();
     
    +	/**
    +	 * If true, the producer will wait until all outstanding action requests have been sent to C*.
    +	 */
    +	private boolean flushOnCheckpoint = true;
    --- End diff --
    
    Should we default this setting as true then? Making it final does not allow to reset the flag after obj is constructed. Further, should this behavior be applied to all other SinkBase / Producer classes in the future?


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139483818
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnClose() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +
    +			casSinkFunc.close();
    +		} catch (IOException e) {
    +			//expected async error from close()
    +
    +			Assert.assertTrue(e.getMessage().contains("Error while sending value"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on invoke() if previously message delivery failed.
    +	 */
    +	//TODO: should unitfy error handling logic in CassandraSinkBase
    +	//Exception would have been thrown from invoke(), but asyncError was not set null, hence it was rethrown in close()
    +	@Ignore
    +	@Test
    +	public void testAsyncErrorThrownOnInvoke() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +			casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +		} catch (IOException e) {
    +			//expected async error thrown from invoke()
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			casSinkFunc.close();
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown when checkpoint performs if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnCheckpoint() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
    +
    +		testHarness.open();
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			//expected async error from snapshotState()
    +
    +			Assert.assertTrue(e.getCause() instanceof IllegalStateException);
    +			Assert.assertTrue(e.getCause().getMessage().contains("Failed to send data to Cassandra"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			casSinkFunc.close();
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	///////////////////////
    +	// Multi Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures that CassandraSinkBase would flush all in-flight message on close(), accompanied with concurrent
    +	 * message delivery successfully via a thraedpool.
    +	 */
    +	@Test
    +	public void testFlushOnPendingRecordsOnCloseWithSuccessfulMessage() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +
    +		ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
    --- End diff --
    
    For a given sink instance invoke() is only called by a single thread, so we're testing undefined behavior here by using multiple threads. (also applies to other tests)


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139615174
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnClose() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +
    +			casSinkFunc.close();
    +		} catch (IOException e) {
    +			//expected async error from close()
    +
    +			Assert.assertTrue(e.getMessage().contains("Error while sending value"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on invoke() if previously message delivery failed.
    +	 */
    +	//TODO: should unitfy error handling logic in CassandraSinkBase
    --- End diff --
    
    Will do. Thanks


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4605


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139484760
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---
    @@ -37,29 +41,49 @@
      *
      * @param <IN> Type of the elements emitted by this sink
      */
    -public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
    +public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction {
     	protected final Logger log = LoggerFactory.getLogger(getClass());
     	protected transient Cluster cluster;
     	protected transient Session session;
     
    -	protected transient volatile Throwable exception;
    +	protected transient volatile Throwable asyncError;
     	protected transient FutureCallback<V> callback;
     
    -	private final ClusterBuilder builder;
    +	protected final ClusterBuilder builder;
     
     	private final AtomicInteger updatesPending = new AtomicInteger();
     
    +	/**
    +	 * If true, the producer will wait until all outstanding action requests have been sent to C*.
    +	 */
    +	private boolean flushOnCheckpoint = true;
    --- End diff --
    
    this should be a constructor argument and final.


---

[GitHub] flink issue #4605: [FLINK-4500] [C* Connector] CassandraSinkBase implements ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4605
  
    I had to modify the PR more than i initially expected.
    
    I removed the option to disable the new behavior again; after thinking about it more i came to the conclusion that silently losing data is never an acceptable behavior.
    I also rewrote the tests to be more straight-forward. 


---

[GitHub] flink issue #4605: [FLINK-4500] [C* Connector] CassandraSinkBase implements ...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on the issue:

    https://github.com/apache/flink/pull/4605
  
    @zentol , cool! Thanks for reviewing it!


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139485057
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnClose() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +
    +			casSinkFunc.close();
    +		} catch (IOException e) {
    +			//expected async error from close()
    +
    +			Assert.assertTrue(e.getMessage().contains("Error while sending value"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on invoke() if previously message delivery failed.
    +	 */
    +	//TODO: should unitfy error handling logic in CassandraSinkBase
    +	//Exception would have been thrown from invoke(), but asyncError was not set null, hence it was rethrown in close()
    +	@Ignore
    +	@Test
    +	public void testAsyncErrorThrownOnInvoke() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +			casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +		} catch (IOException e) {
    +			//expected async error thrown from invoke()
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			casSinkFunc.close();
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown when checkpoint performs if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnCheckpoint() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
    +
    +		testHarness.open();
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			//expected async error from snapshotState()
    +
    +			Assert.assertTrue(e.getCause() instanceof IllegalStateException);
    +			Assert.assertTrue(e.getCause().getMessage().contains("Failed to send data to Cassandra"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			casSinkFunc.close();
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	///////////////////////
    +	// Multi Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures that CassandraSinkBase would flush all in-flight message on close(), accompanied with concurrent
    +	 * message delivery successfully via a thraedpool.
    +	 */
    +	@Test
    +	public void testFlushOnPendingRecordsOnCloseWithSuccessfulMessage() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +
    +		ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		for (int i = 0; i < MAX_THREAD_NUM; i++) {
    +			threadPool.submit(() -> {
    +				try {
    +
    +					casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.DELAYED_SUCCESS);
    +				} catch (Exception e) {
    +					LOG.error("Error while dispatching sending message to Cassandra sink => {} ", e);
    +				}
    +			});
    +		}
    +
    +		//wait until the first message has been dispatched and invoked
    +		Thread.sleep(500);
    +
    +		casSinkFunc.close();
    +
    +		threadPool.shutdown();
    +		threadPool.awaitTermination(10, TimeUnit.SECONDS);
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that CassandraSinkBase would flush all in-flight message when checkpoint performs, accompanied
    +	 * with concurrent message delivery successfully via a thraedpool.
    +	 */
    +	@Test
    +	public void testFlushOnPendingRecordsOnCheckpoint() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +
    +		ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +				new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
    +
    +		testHarness.open();
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		for (int i = 0; i < MAX_THREAD_NUM; i++) {
    +			threadPool.submit(() -> {
    +				try {
    +
    +					casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.DELAYED_SUCCESS);
    +				} catch (Exception e) {
    +					LOG.error("Error while dispatching sending message to Cassandra sink => {} ", e);
    +				}
    +			});
    +			Thread.sleep(500);
    +		}
    +
    +		//wait until the first message has been dispatched and invoked
    +		Thread.sleep(500);
    +
    +		testHarness.snapshot(123L, 123L);
    +
    +		threadPool.shutdown();
    +		threadPool.awaitTermination(10, TimeUnit.SECONDS);
    +
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +		casSinkFunc.close();
    +	}
    +
    +	/**
    +	 * Test ensures that CassandraSinkBase would NOT flush all in-flight message when checkpoint performs.
    +	 */
    +	@Test
    +	public void testDoNotFlushOnPendingRecordsOnCheckpoint() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +
    +		ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(false);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
    +
    +		testHarness.open();
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		for (int i = 0; i < MAX_THREAD_NUM; i++) {
    +			threadPool.submit(() -> {
    +				try {
    +
    +					casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.DELAYED_SUCCESS);
    +				} catch (Exception e) {
    +					LOG.error("Error while dispatching sending message to Cassandra sink => {} ", e);
    +				}
    +			});
    +			Thread.sleep(500);
    +		}
    +
    +		//wait until the first message has been dispatched and invoked
    +		Thread.sleep(500);
    +
    +		testHarness.snapshot(123L, 123L);
    +		//Final pending records # > 0
    +		Assert.assertTrue(casSinkFunc.getNumOfPendingRecords() > 0);
    +
    +		threadPool.shutdown();
    +		threadPool.awaitTermination(10, TimeUnit.SECONDS);
    +
    +		casSinkFunc.close();
    +	}
    +
    +	/**
    +	 * Test ensures that CassandraSinkBase would flush all in-flight message on close(), accompanied with concurrent
    +	 * thread dispatched message failure via a thraedpool.
    +	 */
    +	@Test
    +	public void testFlushOnPendingRecordsOnCloseWithFailedMessage() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +
    +		ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +		try {
    +			for (int i = 0; i < MAX_THREAD_NUM; i++) {
    +				threadPool.submit(() -> {
    +					try {
    +
    +						casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.DELAYED_FAILURE);
    +					} catch (Exception e) {
    +						LOG.error("Error while dispatching sending message to Cassandra sink => {} ", e);
    +					}
    +
    +				});
    +			}
    +			//wait until the first message has been dispatched and invoked
    +			Thread.sleep(500);
    +
    +			casSinkFunc.close();
    +		} catch (IOException e) {
    +			//expected async error from close()
    +		} finally {
    +			//wait for all message dispatching threads to end
    +			threadPool.shutdown();
    +			threadPool.awaitTermination(10, TimeUnit.SECONDS);
    +		}
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	////////////////////////////////
    +	// Utilities
    +	///////////////////////////////
    +
    +	private enum PredeterminedResult {
    +		IMMEDIATE_SUCCESS,
    +		IMMEDIATE_FAILURE,
    +		IMMEDIATE_CANCELLATION,
    +		DELAYED_SUCCESS,
    +		DELAYED_FAILURE
    +	}
    +
    +	private static class DummyCassandraSinkBase<IN, V> extends CassandraSinkBase<IN, V> {
    +
    +		@SuppressWarnings("unchecked")
    +		DummyCassandraSinkBase(ClusterBuilder clusterBuilder) {
    +			super(clusterBuilder);
    +		}
    +
    +		@Override
    +		public ListenableFuture<V> send(IN value) {
    +			return (ListenableFuture<V>) session.executeAsync(DUMMY_QUERY_STMT);
    +		}
    +
    +	}
    +
    +	private static class MockCassandraSinkBase<IN> extends CassandraSinkBase<IN, ResultSet> {
    +
    +		@SuppressWarnings("unchecked")
    +		MockCassandraSinkBase(ClusterBuilder clusterBuilder) {
    +			super(clusterBuilder);
    +
    +			cluster = mock(Cluster.class);
    +			session = mock(Session.class);
    +			when(builder.getCluster()).thenReturn(cluster);
    +			when(cluster.connect()).thenReturn(session);
    +		}
    +
    +		public void invokeWithImmediateResult(IN value, PredeterminedResult result) throws Exception {
    +			when(session.executeAsync(DUMMY_QUERY_STMT)).thenAnswer(new Answer<ResultSetFuture>() {
    +					@Override
    +					public ResultSetFuture answer(InvocationOnMock invocationOnMock) throws Throwable {
    +						ResultSetFuture predeterminedFuture = null;
    +
    +						switch (result) {
    +							case IMMEDIATE_FAILURE:
    +								predeterminedFuture = ResultSetFutures.immediateFailedFuture(new IllegalStateException("Immediate Failure!"));
    +								break;
    +
    +							case IMMEDIATE_CANCELLATION:
    +								predeterminedFuture = ResultSetFutures.immediateCancelledFuture();
    +								break;
    +
    +							case DELAYED_FAILURE:
    +								predeterminedFuture = ResultSetFutures.delayedFailedFuture(new IllegalStateException("Delayed Failure!"));
    +								break;
    +
    +							case DELAYED_SUCCESS:
    +								predeterminedFuture = ResultSetFutures.delayedFuture(null);
    +								break;
    +							//If not specified, set result to Successful
    +							default:
    +							case IMMEDIATE_SUCCESS:
    +								predeterminedFuture = ResultSetFutures.immediateFuture(null);
    +								break;
    +						}
    +
    +						log.info("Invoke with {} of {}", value, result.name());
    +
    +						return predeterminedFuture;
    +					}
    +				}
    +			);
    +			invoke(value);
    +		}
    +
    +		@Override
    +		public ListenableFuture<ResultSet> send(IN value) {
    +			return (ListenableFuture<ResultSet>) session.executeAsync(DUMMY_QUERY_STMT);
    +		}
    +
    +	}
    +
    +	public static void main(String[] args) throws Exception {
    --- End diff --
    
    remove


---

[GitHub] flink issue #4605: [FLINK-4500] [C* Connector] CassandraSinkBase implements ...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on the issue:

    https://github.com/apache/flink/pull/4605
  
    Hi, @zentol, since there are some conflicts in this branch and a bit out of date from current master, I will rebase on this branch. However, I need to know if you are planning to review and merge this PR recently; otherwise, I could do that in later time. 


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by mcfongtw <gi...@git.apache.org>.
Github user mcfongtw commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139685362
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    +		Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnClose() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +
    +			casSinkFunc.close();
    +		} catch (IOException e) {
    +			//expected async error from close()
    +
    +			Assert.assertTrue(e.getMessage().contains("Error while sending value"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown on invoke() if previously message delivery failed.
    +	 */
    +	//TODO: should unitfy error handling logic in CassandraSinkBase
    +	//Exception would have been thrown from invoke(), but asyncError was not set null, hence it was rethrown in close()
    +	@Ignore
    +	@Test
    +	public void testAsyncErrorThrownOnInvoke() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +		try {
    +			casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +		} catch (IOException e) {
    +			//expected async error thrown from invoke()
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			casSinkFunc.close();
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensures that an asyncError would be thrown when checkpoint performs if previously message delivery failed.
    +	 */
    +	@Test
    +	public void testAsyncErrorThrownOnCheckpoint() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(casSinkFunc));
    +
    +		testHarness.open();
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			//expected async error from snapshotState()
    +
    +			Assert.assertTrue(e.getCause() instanceof IllegalStateException);
    +			Assert.assertTrue(e.getCause().getMessage().contains("Failed to send data to Cassandra"));
    +
    +			//Final pending updates should be zero
    +			Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
    +
    +			casSinkFunc.close();
    +
    +			//done
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	///////////////////////
    +	// Multi Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures that CassandraSinkBase would flush all in-flight message on close(), accompanied with concurrent
    +	 * message delivery successfully via a thraedpool.
    +	 */
    +	@Test
    +	public void testFlushOnPendingRecordsOnCloseWithSuccessfulMessage() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +
    +		ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
    --- End diff --
    
    Okay. I can get rid of those multi-threading tests with mocks to mimic scenarios to ensure it will wait upon flushing all pending records before checkpoint was triggered. 


---

[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4605#discussion_r139482708
  
    --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.cassandra;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.util.concurrent.ListenableFuture;
    +
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Tests for the {@link CassandraSinkBase}.
    + */
    +public class CassandraSinkBaseTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
    +
    +	private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
    +
    +	private static final String DUMMY_MESSAGE = "Dummy_msg";
    +
    +	private static final int MAX_THREAD_NUM = 3;
    +
    +	private static final int MAX_THREAD_POOL_SIZE = 2;
    +
    +	@BeforeClass
    +	public static void doSetUp() throws Exception {
    +
    +	}
    +
    +	@BeforeClass
    +	public static void doTearDown() throws Exception {
    +
    +	}
    +
    +	/**
    +	 * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
    +	 */
    +	@Test(expected = NoHostAvailableException.class)
    +	public void testCasHostNotFoundErrorHandling() throws Exception {
    +		CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
    +			@Override
    +			protected Cluster buildCluster(Cluster.Builder builder) {
    +				return builder
    +					.addContactPoint("127.0.0.1")
    +					.withoutJMXReporting()
    +					.withoutMetrics().build();
    +			}
    +		});
    +
    +		base.open(new Configuration());
    +		base.close();
    +	}
    +
    +
    +	///////////////////////
    +	// Single Thread Test
    +	///////////////////////
    +
    +	/**
    +	 * Test ensures the message could be delivered successfully to sink.
    +	 */
    +	@Test
    +	public void testSimpleSuccessfulPath() throws Exception {
    +		ClusterBuilder builder = mock(ClusterBuilder.class);
    +		MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
    +		casSinkFunc.setFlushOnCheckpoint(true);
    +		casSinkFunc.open(new Configuration());
    +
    +		casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
    +
    +		casSinkFunc.close();
    +
    +		//Final pending updates should be zero
    --- End diff --
    
    unnecessary comment


---