You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by su...@apache.org on 2015/12/18 11:44:16 UTC

[16/26] incubator-atlas git commit: ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhmenath via sumasai)

ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhmenath via sumasai)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/f8dd5dcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/f8dd5dcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/f8dd5dcb

Branch: refs/heads/branch-0.6-incubating
Commit: f8dd5dcb358b0328d34f57d5b1c6fcd730fb48e1
Parents: 6dfbda1
Author: Suma Shivaprasad <su...@gmail.com>
Authored: Mon Dec 14 16:29:47 2015 +0530
Committer: Suma Shivaprasad <su...@gmail.com>
Committed: Mon Dec 14 16:29:47 2015 +0530

----------------------------------------------------------------------
 client/pom.xml                                  |  6 +++
 .../main/java/org/apache/atlas/AtlasClient.java | 22 +++++++++-
 .../notification/NotificationHookConsumer.java  | 42 ++++++++++++++++++++
 pom.xml                                         |  2 +-
 release-log.txt                                 |  1 +
 .../atlas/web/listeners/GuiceServletConfig.java |  2 +-
 6 files changed, 72 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 279d894..d41b5bf 100755
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -67,5 +67,11 @@
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index becc4db..b108b25 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -19,6 +19,7 @@
 package org.apache.atlas;
 
 import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
@@ -67,6 +68,7 @@ public class AtlasClient {
     public static final String DATATYPE = "dataType";
 
     public static final String BASE_URI = "api/atlas/";
+    public static final String ADMIN_VERSION = "admin/version";
     public static final String TYPES = "types";
     public static final String URI_ENTITY = "entities";
     public static final String URI_SEARCH = "discovery/search";
@@ -126,11 +128,29 @@ public class AtlasClient {
         service = client.resource(UriBuilder.fromUri(baseUrl).build());
     }
 
+    // for testing
+    AtlasClient(WebResource service) {
+        this.service = service;
+    }
+
     protected Configuration getClientProperties() throws AtlasException {
         return ApplicationProperties.get();
     }
 
-    enum API {
+    public boolean isServerReady() throws AtlasServiceException {
+        WebResource resource = getResource(API.VERSION);
+        try {
+            callAPIWithResource(API.VERSION, resource);
+            return true;
+        } catch (ClientHandlerException che) {
+            return false;
+        }
+    }
+
+    public enum API {
+
+        //Admin operations
+        VERSION(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK),
 
         //Type operations
         CREATE_TYPE(BASE_URI + TYPES, HttpMethod.POST, Response.Status.CREATED),

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index ffeb406..1bee26f 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -22,6 +22,7 @@ import com.google.inject.Singleton;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
 import org.codehaus.jettison.json.JSONArray;
@@ -42,6 +43,7 @@ public class NotificationHookConsumer implements Service {
 
     public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
     public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
+    public static final int SERVER_READY_WAIT_TIME_MS = 1000;
 
     @Inject
     private NotificationInterface notificationInterface;
@@ -77,15 +79,32 @@ public class NotificationHookConsumer implements Service {
         }
     }
 
+    static class Timer {
+        public void sleep(int interval) throws InterruptedException {
+            Thread.sleep(interval);
+        }
+    }
+
     class HookConsumer implements Runnable {
         private final NotificationConsumer<JSONArray> consumer;
+        private final AtlasClient client;
 
         public HookConsumer(NotificationConsumer<JSONArray> consumer) {
+            this(atlasClient, consumer);
+        }
+
+        public HookConsumer(AtlasClient client, NotificationConsumer<JSONArray> consumer) {
+            this.client = client;
             this.consumer = consumer;
         }
 
         @Override
         public void run() {
+
+            if (!serverAvailable(new NotificationHookConsumer.Timer())) {
+                return;
+            }
+
             while(consumer.hasNext()) {
                 JSONArray entityJson = consumer.next();
                 LOG.info("Processing message {}", entityJson);
@@ -98,5 +117,28 @@ public class NotificationHookConsumer implements Service {
                 }
             }
         }
+
+        boolean serverAvailable(Timer timer) {
+            try {
+                while (!client.isServerReady()) {
+                    try {
+                        LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
+                                SERVER_READY_WAIT_TIME_MS);
+                        timer.sleep(SERVER_READY_WAIT_TIME_MS);
+                    } catch (InterruptedException e) {
+                        LOG.info("Interrupted while waiting for Atlas Server to become ready, " +
+                                "exiting consumer thread.", e);
+                        return false;
+                    }
+                }
+            } catch (AtlasServiceException e) {
+                LOG.info(
+                        "Handled AtlasServiceException while waiting for Atlas Server to become ready, " +
+                                "exiting consumer thread.", e);
+                return false;
+            }
+            LOG.info("Atlas Server is ready, can start reading Kafka events.");
+            return true;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 929d255..d9c3df1 100755
--- a/pom.xml
+++ b/pom.xml
@@ -323,7 +323,7 @@
         <node.version>v0.10.30</node.version>
         <slf4j.version>1.7.7</slf4j.version>
         <jetty.version>9.2.12.v20150709</jetty.version>
-        <jersey.version>1.10</jersey.version>
+        <jersey.version>1.19</jersey.version>
         <jackson.version>1.8.3</jackson.version>
         <tinkerpop.version>2.6.0</tinkerpop.version>
         <titan.version>0.5.4</titan.version>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 07a3e9b..ffd69ea 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
 ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
 
 ALL CHANGES:
+ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhemanth via sumasai)
 ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai)
 ATLAS-244 UI: Add Tag Tab (darshankumar89 via sumasai)
 ATLAS-376 UI: Use the Schema API of the backend to populate details for Schema tab (darshankumar89 via sumasai)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index f0d80cb..c1f6a9b 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -124,7 +124,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
     }
 
     protected void startServices() {
-        LOG.debug("Starting services");
+        LOG.info("Starting services");
         Services services = injector.getInstance(Services.class);
         services.start();
     }