You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2017/09/18 17:23:29 UTC
[GitHub] flink pull request #4605: [FLINK-4500] [C* Connector] CassandraSinkBase impl...
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4605#discussion_r139482849
--- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java ---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+ private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+ private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+ private static final int MAX_THREAD_NUM = 3;
+
+ private static final int MAX_THREAD_POOL_SIZE = 2;
+
+ @BeforeClass
+ public static void doSetUp() throws Exception {
+
+ }
+
+ @BeforeClass
+ public static void doTearDown() throws Exception {
+
+ }
+
+ /**
+ * Test ensures a NoHostAvailableException would be thrown if a contact point added does not exist.
+ */
+ @Test(expected = NoHostAvailableException.class)
+ public void testCasHostNotFoundErrorHandling() throws Exception {
+ CassandraSinkBase base = new DummyCassandraSinkBase<>(new ClusterBuilder() {
+ @Override
+ protected Cluster buildCluster(Cluster.Builder builder) {
+ return builder
+ .addContactPoint("127.0.0.1")
+ .withoutJMXReporting()
+ .withoutMetrics().build();
+ }
+ });
+
+ base.open(new Configuration());
+ base.close();
+ }
+
+
+ ///////////////////////
+ // Single Thread Test
+ ///////////////////////
+
+ /**
+ * Test ensures the message could be delivered successfully to sink.
+ */
+ @Test
+ public void testSimpleSuccessfulPath() throws Exception {
+ ClusterBuilder builder = mock(ClusterBuilder.class);
+ MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
+ casSinkFunc.setFlushOnCheckpoint(true);
+ casSinkFunc.open(new Configuration());
+
+ casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_SUCCESS);
+
+ casSinkFunc.close();
+
+ //Final pending updates should be zero
+ Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+ }
+
+ /**
+ * Test ensures that an asyncError would be thrown on close() if previously message delivery failed.
+ */
+ @Test
+ public void testAsyncErrorThrownOnClose() throws Exception {
+ ClusterBuilder builder = mock(ClusterBuilder.class);
+ MockCassandraSinkBase casSinkFunc = new MockCassandraSinkBase<>(builder);
+ casSinkFunc.setFlushOnCheckpoint(true);
+
+ casSinkFunc.open(new Configuration());
+
+ casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE, PredeterminedResult.IMMEDIATE_FAILURE);
+ try {
+
+ casSinkFunc.close();
+ } catch (IOException e) {
+ //expected async error from close()
+
+ Assert.assertTrue(e.getMessage().contains("Error while sending value"));
+
+ //Final pending updates should be zero
+ Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+
+ //done
+ return;
+ }
+
+ Assert.fail();
--- End diff --
we usually put the fail within the ```try``` block (also applies to other tests)
---