You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by si...@apache.org on 2021/10/29 00:31:26 UTC

[atlas] branch master updated: ATLAS-4462: Upgraded Surefire version, Updated the pom to stop jetty before start, Added retry logic for Embedded Kafka start having port bind issue

This is an automated email from the ASF dual-hosted git repository.

sidmishra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 65bcd9f  ATLAS-4462: Upgraded Surefire version, Updated the pom to stop jetty before start, Added retry logic for Embedded Kafka start having port bind issue
65bcd9f is described below

commit 65bcd9f57354fd5e50435d299df8508c14108184
Author: Sidharth Mishra <si...@gmail.com>
AuthorDate: Thu Oct 28 13:48:37 2021 -0700

    ATLAS-4462: Upgraded Surefire version, Updated the pom to stop jetty before start, Added retry logic for Embedded Kafka start having port bind issue
    
    Signed-off-by: Sidharth Mishra <si...@apache.org>
---
 addons/falcon-bridge/pom.xml                       |  2 +
 addons/hbase-bridge/pom.xml                        |  2 +
 addons/hive-bridge/pom.xml                         |  2 +
 addons/impala-bridge/pom.xml                       |  2 +
 addons/kafka-bridge/pom.xml                        |  2 +
 addons/sqoop-bridge/pom.xml                        |  2 +
 addons/storm-bridge/pom.xml                        |  2 +
 .../apache/atlas/kafka/EmbeddedKafkaServer.java    | 69 +++++++++++++----
 .../apache/atlas/util/CommandHandlerUtility.java   | 88 ++++++++++++++++++++++
 pom.xml                                            |  4 +-
 webapp/pom.xml                                     |  2 +
 11 files changed, 159 insertions(+), 18 deletions(-)

diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
index de22fa5..1e2ce7c 100644
--- a/addons/falcon-bridge/pom.xml
+++ b/addons/falcon-bridge/pom.xml
@@ -310,6 +310,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                         <configuration>
diff --git a/addons/hbase-bridge/pom.xml b/addons/hbase-bridge/pom.xml
index 50fb9e8..a6ed514 100644
--- a/addons/hbase-bridge/pom.xml
+++ b/addons/hbase-bridge/pom.xml
@@ -466,6 +466,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                         <configuration>
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index ea2de4d..8c3636e 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -436,6 +436,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                     </execution>
diff --git a/addons/impala-bridge/pom.xml b/addons/impala-bridge/pom.xml
index a1bd592..186251a 100644
--- a/addons/impala-bridge/pom.xml
+++ b/addons/impala-bridge/pom.xml
@@ -450,6 +450,8 @@
             <id>start-jetty</id>
             <phase>pre-integration-test</phase>
             <goals>
+              <!-- stop any previous instance to free up the port -->
+              <goal>stop</goal>
               <goal>deploy-war</goal>
             </goals>
           </execution>
diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml
index 7fe97eb..30fb53d 100644
--- a/addons/kafka-bridge/pom.xml
+++ b/addons/kafka-bridge/pom.xml
@@ -302,6 +302,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                         <configuration>
diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml
index 9d5ac52..021e93f 100644
--- a/addons/sqoop-bridge/pom.xml
+++ b/addons/sqoop-bridge/pom.xml
@@ -370,6 +370,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                         <configuration>
diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml
index 7152543..e8106af 100644
--- a/addons/storm-bridge/pom.xml
+++ b/addons/storm-bridge/pom.xml
@@ -487,6 +487,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                         <configuration>
diff --git a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
index 19717fb..0a1f02a 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
@@ -19,12 +19,14 @@ package org.apache.atlas.kafka;
 
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
+import kafka.zookeeper.ZooKeeperClientException;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
@@ -33,19 +35,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
+import org.apache.atlas.util.CommandHandlerUtility;
 import scala.Option;
-import scala.collection.mutable.ArrayBuffer;
 
 import javax.inject.Inject;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
-import java.net.URISyntaxException;
 import java.net.URL;
+import java.net.BindException;
 import java.util.*;
 
-
 @Component
 @Order(3)
 public class EmbeddedKafkaServer implements Service {
@@ -55,8 +56,10 @@ public class EmbeddedKafkaServer implements Service {
     private static final String ATLAS_KAFKA_DATA  = "data";
     public  static final String PROPERTY_EMBEDDED = "atlas.notification.embedded";
 
+    private static final int    MAX_RETRY_TO_ACQUIRE_PORT = 3;
+
     private final boolean           isEmbedded;
-    private final Properties        properties;
+    private       Properties        properties;
     private       KafkaServer       kafkaServer;
     private       ServerCnxnFactory factory;
 
@@ -102,7 +105,7 @@ public class EmbeddedKafkaServer implements Service {
         LOG.info("<== EmbeddedKafka.stop(isEmbedded={})", isEmbedded);
     }
 
-    private String startZk() throws IOException, InterruptedException, URISyntaxException {
+    private String startZk() throws IOException, InterruptedException {
         String zkValue = properties.getProperty("zookeeper.connect");
 
         LOG.info("Starting zookeeper at {}", zkValue);
@@ -111,7 +114,20 @@ public class EmbeddedKafkaServer implements Service {
         File snapshotDir = constructDir("zk/txn");
         File logDir      = constructDir("zk/snap");
 
-        factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
+        for (int attemptCount = 0; attemptCount < MAX_RETRY_TO_ACQUIRE_PORT; attemptCount++) {
+            try {
+                factory     = NIOServerCnxnFactory.createFactory(new InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
+                break;
+            } catch (BindException e) {
+                LOG.warn("Attempt {}: Starting zookeeper at {} failed", attemptCount, zkValue);
+
+                if(attemptCount == MAX_RETRY_TO_ACQUIRE_PORT - 1) {
+                    throw e;
+                }
+
+                CommandHandlerUtility.tryKillingProcessUsingPort(zkAddress.getPort(), attemptCount != 0);
+            }
+        }
 
         factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
 
@@ -122,7 +138,7 @@ public class EmbeddedKafkaServer implements Service {
         return ret;
     }
 
-    private void startKafka() throws IOException, URISyntaxException {
+    private void startKafka() throws IOException {
         String kafkaValue = properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
 
         LOG.info("Starting kafka at {}", kafkaValue);
@@ -130,15 +146,36 @@ public class EmbeddedKafkaServer implements Service {
         URL        kafkaAddress = getURL(kafkaValue);
         Properties brokerConfig = properties;
 
-        brokerConfig.setProperty("broker.id", "1");
-        brokerConfig.setProperty("host.name", kafkaAddress.getHost());
-        brokerConfig.setProperty("port", String.valueOf(kafkaAddress.getPort()));
-        brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
-        brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
-
-        kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), Time.SYSTEM, Option.apply(this.getClass().getName()), false);
-
-        kafkaServer.startup();
+        for (int attemptCount = 0; attemptCount < MAX_RETRY_TO_ACQUIRE_PORT; attemptCount++) {
+            try {
+                brokerConfig.setProperty("broker.id", "1");
+                brokerConfig.setProperty("host.name", kafkaAddress.getHost());
+                brokerConfig.setProperty("port", String.valueOf(kafkaAddress.getPort()));
+                brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
+                brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
+
+                kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), Time.SYSTEM, Option.apply(this.getClass().getName()), false);
+
+                kafkaServer.startup();
+                break;
+            } catch (KafkaException | ZooKeeperClientException e) {
+                LOG.warn("Attempt {}: kafka server with broker config {} failed", attemptCount, brokerConfig);
+
+                if (attemptCount == MAX_RETRY_TO_ACQUIRE_PORT - 1) {
+                    throw e;
+                }
+
+                if (kafkaServer != null) {
+                    try {
+                        kafkaServer.shutdown();
+                    } catch (Exception ex) {
+                        LOG.info("Failed to shutdown kafka server", ex);
+                    }
+                }
+
+                CommandHandlerUtility.tryKillingProcessUsingPort(kafkaAddress.getPort(), attemptCount != 0);
+            }
+        }
 
         LOG.info("Embedded kafka server started with broker config {}", brokerConfig);
     }
diff --git a/notification/src/main/java/org/apache/atlas/util/CommandHandlerUtility.java b/notification/src/main/java/org/apache/atlas/util/CommandHandlerUtility.java
new file mode 100644
index 0000000..13ee786
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/util/CommandHandlerUtility.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.stream.Collectors;
+
+public class CommandHandlerUtility {
+    public static final Logger LOG = LoggerFactory.getLogger(CommandHandlerUtility.class);
+
+    private static final String SHELL_CMD                   = "/bin/sh";
+    private static final String SHELL_CMD_OPTION            = "-c";
+    private static final String FIND_PROCESS_ID_CMD_FORMAT  = "lsof -i:%s | tail -n 1 | tr -s ' ' | cut -d' ' -f2";
+    private static final String KILL_PROCESS_CMD_FORMAT     = "kill %s %s" ;
+    private static final int    SLEEP_AFTER_SOFT_KILL_IN_MS = 10000;
+
+    public static void tryKillingProcessUsingPort(int port, boolean forceKill) {
+        String processID = findProcessIdUsingPort(port);
+        sendKillToPID(processID, forceKill);
+    }
+
+    private static String findProcessIdUsingPort(int port) {
+        String         retPID = "";
+
+        final String[] cmd = {
+                SHELL_CMD,
+                SHELL_CMD_OPTION,
+                String.format(FIND_PROCESS_ID_CMD_FORMAT, port)
+        };
+
+        try {
+            Process p = Runtime.getRuntime().exec(cmd);
+            retPID = new BufferedReader(new InputStreamReader(p.getInputStream()))
+                    .lines().collect(Collectors.joining("\n"));
+
+            if (StringUtils.isEmpty(retPID)) {
+                String errorMsg = new BufferedReader(new InputStreamReader(p.getErrorStream()))
+                        .lines().collect(Collectors.joining("\n"));
+                throw new IOException(errorMsg);
+            }
+        } catch (IOException e) {
+            LOG.warn("Failed to get process ID which uses the port{}", port, e);
+        }
+
+        return retPID;
+    }
+
+    private static void sendKillToPID(String pid, boolean forceKill) {
+        if (StringUtils.isBlank(pid)) {
+            return;
+        }
+
+        final String cmd = String.format(KILL_PROCESS_CMD_FORMAT, (forceKill ? "-9 " : ""), pid);
+
+        try {
+            Runtime.getRuntime().exec(cmd);
+
+            if (!forceKill) {
+                LOG.info("Sleeping for {} milliseconds after soft kill", SLEEP_AFTER_SOFT_KILL_IN_MS);
+                Thread.sleep(SLEEP_AFTER_SOFT_KILL_IN_MS);
+            }
+        } catch (IOException | InterruptedException e) {
+            LOG.warn("Failed to kill the process {} which uses the port with hard kill flag{}", pid, forceKill, e);
+        }
+    }
+}
diff --git a/pom.xml b/pom.xml
index 61f1323..da8260f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -765,7 +765,7 @@
         <sqoop.version>1.4.6.2.3.99.0-195</sqoop.version>
         <storm.version>2.1.0</storm.version>
         <surefire.forkCount>2C</surefire.forkCount>
-        <surefire.version>2.18.1</surefire.version>
+        <surefire.version>3.0.0-M5</surefire.version>
         <testng.version>6.9.4</testng.version>
         <tinkerpop.version>3.4.10</tinkerpop.version>
         <woodstox-core.version>5.0.3</woodstox-core.version>
@@ -1952,7 +1952,7 @@
                     <dependency>
                         <groupId>org.apache.maven.surefire</groupId>
                         <artifactId>surefire-testng</artifactId>
-                        <version>2.18.1</version>
+                        <version>${surefire.version}</version>
                     </dependency>
                 </dependencies>
             </plugin>
diff --git a/webapp/pom.xml b/webapp/pom.xml
index d90f686..858ab9a 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -768,6 +768,8 @@
                         <id>start-jetty</id>
                         <phase>pre-integration-test</phase>
                         <goals>
+                            <!-- stop any previous instance to free up the port -->
+                            <goal>stop</goal>
                             <goal>deploy-war</goal>
                         </goals>
                     </execution>