You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by su...@apache.org on 2015/12/18 11:44:16 UTC
[16/26] incubator-atlas git commit: ATLAS-346 Atlas server loses
messages sent from Hive hook if restarted after unclean shutdown (yhmenath
via sumasai)
ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhmenath via sumasai)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/f8dd5dcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/f8dd5dcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/f8dd5dcb
Branch: refs/heads/branch-0.6-incubating
Commit: f8dd5dcb358b0328d34f57d5b1c6fcd730fb48e1
Parents: 6dfbda1
Author: Suma Shivaprasad <su...@gmail.com>
Authored: Mon Dec 14 16:29:47 2015 +0530
Committer: Suma Shivaprasad <su...@gmail.com>
Committed: Mon Dec 14 16:29:47 2015 +0530
----------------------------------------------------------------------
client/pom.xml | 6 +++
.../main/java/org/apache/atlas/AtlasClient.java | 22 +++++++++-
.../notification/NotificationHookConsumer.java | 42 ++++++++++++++++++++
pom.xml | 2 +-
release-log.txt | 1 +
.../atlas/web/listeners/GuiceServletConfig.java | 2 +-
6 files changed, 72 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 279d894..d41b5bf 100755
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -67,5 +67,11 @@
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index becc4db..b108b25 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -19,6 +19,7 @@
package org.apache.atlas;
import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
@@ -67,6 +68,7 @@ public class AtlasClient {
public static final String DATATYPE = "dataType";
public static final String BASE_URI = "api/atlas/";
+ public static final String ADMIN_VERSION = "admin/version";
public static final String TYPES = "types";
public static final String URI_ENTITY = "entities";
public static final String URI_SEARCH = "discovery/search";
@@ -126,11 +128,29 @@ public class AtlasClient {
service = client.resource(UriBuilder.fromUri(baseUrl).build());
}
+ // for testing
+ AtlasClient(WebResource service) {
+ this.service = service;
+ }
+
protected Configuration getClientProperties() throws AtlasException {
return ApplicationProperties.get();
}
- enum API {
+ public boolean isServerReady() throws AtlasServiceException {
+ WebResource resource = getResource(API.VERSION);
+ try {
+ callAPIWithResource(API.VERSION, resource);
+ return true;
+ } catch (ClientHandlerException che) {
+ return false;
+ }
+ }
+
+ public enum API {
+
+ //Admin operations
+ VERSION(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK),
//Type operations
CREATE_TYPE(BASE_URI + TYPES, HttpMethod.POST, Response.Status.CREATED),
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index ffeb406..1bee26f 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -22,6 +22,7 @@ import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray;
@@ -42,6 +43,7 @@ public class NotificationHookConsumer implements Service {
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
+ public static final int SERVER_READY_WAIT_TIME_MS = 1000;
@Inject
private NotificationInterface notificationInterface;
@@ -77,15 +79,32 @@ public class NotificationHookConsumer implements Service {
}
}
+ static class Timer {
+ public void sleep(int interval) throws InterruptedException {
+ Thread.sleep(interval);
+ }
+ }
+
class HookConsumer implements Runnable {
private final NotificationConsumer<JSONArray> consumer;
+ private final AtlasClient client;
public HookConsumer(NotificationConsumer<JSONArray> consumer) {
+ this(atlasClient, consumer);
+ }
+
+ public HookConsumer(AtlasClient client, NotificationConsumer<JSONArray> consumer) {
+ this.client = client;
this.consumer = consumer;
}
@Override
public void run() {
+
+ if (!serverAvailable(new NotificationHookConsumer.Timer())) {
+ return;
+ }
+
while(consumer.hasNext()) {
JSONArray entityJson = consumer.next();
LOG.info("Processing message {}", entityJson);
@@ -98,5 +117,28 @@ public class NotificationHookConsumer implements Service {
}
}
}
+
+ boolean serverAvailable(Timer timer) {
+ try {
+ while (!client.isServerReady()) {
+ try {
+ LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
+ SERVER_READY_WAIT_TIME_MS);
+ timer.sleep(SERVER_READY_WAIT_TIME_MS);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while waiting for Atlas Server to become ready, " +
+ "exiting consumer thread.", e);
+ return false;
+ }
+ }
+ } catch (AtlasServiceException e) {
+ LOG.info(
+ "Handled AtlasServiceException while waiting for Atlas Server to become ready, " +
+ "exiting consumer thread.", e);
+ return false;
+ }
+ LOG.info("Atlas Server is ready, can start reading Kafka events.");
+ return true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 929d255..d9c3df1 100755
--- a/pom.xml
+++ b/pom.xml
@@ -323,7 +323,7 @@
<node.version>v0.10.30</node.version>
<slf4j.version>1.7.7</slf4j.version>
<jetty.version>9.2.12.v20150709</jetty.version>
- <jersey.version>1.10</jersey.version>
+ <jersey.version>1.19</jersey.version>
<jackson.version>1.8.3</jackson.version>
<tinkerpop.version>2.6.0</tinkerpop.version>
<titan.version>0.5.4</titan.version>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 07a3e9b..ffd69ea 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES:
+ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhemanth via sumasai)
ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai)
ATLAS-244 UI: Add Tag Tab (darshankumar89 via sumasai)
ATLAS-376 UI: Use the Schema API of the backend to populate details for Schema tab (darshankumar89 via sumasai)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f8dd5dcb/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index f0d80cb..c1f6a9b 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -124,7 +124,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
}
protected void startServices() {
- LOG.debug("Starting services");
+ LOG.info("Starting services");
Services services = injector.getInstance(Services.class);
services.start();
}