You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/03/07 13:45:34 UTC

[camel-kafka-connector] 01/05: Fixed flaky hdfs itest.

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 42456d6a414b3f94c4956723e1b9f6d0a0e6f498
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Fri Mar 5 22:12:27 2021 +0100

    Fixed flaky hdfs itest.
---
 .../hdfs/sink/CamelSinkHDFSITCase.java             |  28 +++--
 .../camel/kafkaconnector/hdfs/utils/HDFSEasy.java  |  26 ++++-
 tests/itests-netty-http/pom.xml                    |   5 +
 .../surce/CamelNettyHTTPPropertyFactory.java       |  61 ++++++++++
 .../surce/CamelSourceNettyHTTPITCase.java          | 123 +++++++++++++++++++++
 5 files changed, 231 insertions(+), 12 deletions(-)

diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
index f12f310..55cf21f 100644
--- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
+++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
@@ -17,11 +17,6 @@
 
 package org.apache.camel.kafkaconnector.hdfs.sink;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
@@ -37,9 +32,15 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.runners.model.InitializationError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -74,16 +75,23 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
     }
 
     @BeforeEach
-    public void setUp() throws IOException, URISyntaxException {
+    public void setUp() throws IOException, URISyntaxException, InitializationError {
         topicName = getTopicForTest(this);
         hdfsEasy = new HDFSEasy(hdfsService.getHDFSHost(), hdfsService.getPort());
 
         String currentPath = "/test" + TestUtils.randomWithRange(0, 256) + "/";
         currentBasePath = new Path(currentPath);
 
-        if (!hdfsEasy.delete(currentBasePath)) {
-            // This is OK: directory may not exist on the path
-            LOG.debug("The directory at {} was not removed", currentBasePath.getName());
+        boolean hdfsServiceCorrectlyStarted = TestUtils.waitFor(() -> hdfsEasy.createFile(new Path(currentBasePath, "initTest"), "test")
+                                                                        &&  hdfsEasy.delete(new Path(currentBasePath, "initTest")));
+
+        if(hdfsServiceCorrectlyStarted) {
+            if (!hdfsEasy.delete(currentBasePath)) {
+                // This is OK: directory may not exist on the path
+                LOG.debug("The directory at {} was not removed", currentBasePath.getName());
+            }
+        } else {
+            throw new InitializationError("HDFS Service didn't start properly.");
         }
     }
 
@@ -136,7 +144,7 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
 
             LOG.debug("Retrieved file {} with contents: {}", f.getPath(), contents);
             boolean contains = contents.contains(matchString);
-            assertTrue(contains, "Unexpected content for the remote file " + f.getPath().getName());
+            assertTrue(contains, "Unexpected content for the remote file " + f.getPath().getName() + " content: [" + contents + "] should contain [" + matchString + "]");
         } catch (IOException e) {
             LOG.debug("Reading returned file {} failed: {}", f.getPath(), e.getMessage());
             fail("I/O error: " + e.getMessage());
diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
index 7733fe8..4e95191 100644
--- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
+++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/utils/HDFSEasy.java
@@ -26,6 +26,7 @@ import java.util.Scanner;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -110,8 +111,7 @@ public class HDFSEasy {
         try {
             return countFiles(path) >= minFiles;
         } catch (Exception e) {
-            LOG.warn("I/O exception while checking if file {} exists", path.getName());
-
+            LOG.warn("I/O exception: {} due to {} while checking if file {} exists", e.getMessage(), e.getCause(), path.getName());
             return false;
         }
     }
@@ -133,4 +133,26 @@ public class HDFSEasy {
             return false;
         }
     }
+
+    public boolean createFile(Path filePath, String content) {
+        FSDataOutputStream streamWriter = null;
+        try {
+            streamWriter = dfs.create(filePath);
+            streamWriter.writeBytes(content);
+            streamWriter.flush();
+        } catch (IOException e) {
+            LOG.debug("Error in file creation: " + e.getMessage());
+            return false;
+        } finally {
+            if (streamWriter != null) {
+                try {
+                    streamWriter.close();
+                } catch (IOException e) {
+                    LOG.debug("Error in file creation during stream close: " + e.getMessage());
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
 }
diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml
index 16238cf..ddb8bf7 100644
--- a/tests/itests-netty-http/pom.xml
+++ b/tests/itests-netty-http/pom.xml
@@ -52,5 +52,10 @@
             <artifactId>mockwebserver</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java
new file mode 100644
index 0000000..1562328
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelNettyHTTPPropertyFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.surce;
+
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+final class CamelNettyHTTPPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyHTTPPropertyFactory> {
+    private CamelNettyHTTPPropertyFactory() {
+
+    }
+
+    public CamelNettyHTTPPropertyFactory withHost(String host) {
+        return setProperty("camel.source.path.host", host);
+    }
+
+    public CamelNettyHTTPPropertyFactory withProtocol(String protocol) {
+        return setProperty("camel.source.path.protocol", protocol);
+    }
+
+    public CamelNettyHTTPPropertyFactory withPort(int port) {
+        return setProperty("camel.source.path.port", String.valueOf(port));
+    }
+
+    public CamelNettyHTTPPropertyFactory withSync(boolean sync) {
+        return setProperty("camel.source.endpoint.sync", String.valueOf(sync));
+    }
+
+    public CamelNettyHTTPPropertyFactory withReceiveBufferSize(int size) {
+        return setProperty("camel.source.endpoint.receiveBufferSize", String.valueOf(size));
+    }
+
+    public CamelNettyHTTPPropertyFactory withCamelTypeConverterTransformTo(String targetClass) {
+        setProperty("transforms", "cameltypeconverter");
+        setProperty("transforms.cameltypeconverter.type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value");
+        return setProperty("transforms.cameltypeconverter.target.type", targetClass);
+    }
+
+    public static CamelNettyHTTPPropertyFactory basic() {
+        return new CamelNettyHTTPPropertyFactory()
+                .withTasksMax(1)
+                .withName("CamelNettyHttpSourceConnector")
+                .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java
new file mode 100644
index 0000000..e7e6468
--- /dev/null
+++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/surce/CamelSourceNettyHTTPITCase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.surce;
+
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class);
+    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost");
+    private static final String TEST_MESSAGE = "testMessage";
+
+    private String topicName;
+
+    private final int expect = 1;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-netty-http-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        topicName = getTopicForTest(this);
+    }
+
+    @AfterEach
+    public void tearDown() {}
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws Exception {
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelNettyHTTPPropertyFactory.basic()
+                .withKafkaTopic(topicName)
+                .withReceiveBufferSize(10)
+                .withHost("0.0.0.0")
+                .withPort(HTTP_PORT)
+                .withProtocol("http")
+                .withCamelTypeConverterTransformTo("java.lang.String");
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Override
+    protected void produceTestData() {
+        int retriesLeft = 10;
+        boolean success = false;
+        while(retriesLeft > 0 && !success) {
+            try (final CloseableHttpClient httpclient = HttpClients.createDefault()) {
+
+                byte[] ipAddr = new byte[]{127, 0, 0, 1};
+                InetAddress localhost = InetAddress.getByAddress(ipAddr);
+                final HttpPost httpPost = new HttpPost("http://" + localhost.getHostAddress() + ":" + HTTP_PORT);
+
+                LOG.info("Executing request {} {}", httpPost.getMethod(), httpPost.getURI());
+
+                httpPost.setEntity(new StringEntity(TEST_MESSAGE));
+
+                CloseableHttpResponse response = httpclient.execute(httpPost);
+                assertEquals(200, response.getStatusLine().getStatusCode());
+                response.close();
+                httpPost.releaseConnection();
+                success = true;
+                LOG.info("Request success at {} attempt.", retriesLeft);
+            } catch (IOException e) {
+                if(retriesLeft == 1) {
+                    e.printStackTrace();
+                    fail("There should be no exceptions in sending the http test message.");
+                } else {
+                    retriesLeft--;
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException interruptedException) {
+                        interruptedException.printStackTrace();
+                    }
+                }
+            }
+        }
+    }
+
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
+        assertEquals(expect, received, "Didn't process the expected amount of messages");
+        assertEquals(TEST_MESSAGE, consumer.consumedMessages().get(0).value().toString());
+    }
+}