You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by su...@apache.org on 2015/12/18 11:44:17 UTC
[17/26] incubator-atlas git commit: ATLAS-346 Atlas server loses
messages sent from Hive hook if restarted after unclean shutdown (yhmenath
via sumasai)
ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhmenath via sumasai)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/afb9e618
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/afb9e618
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/afb9e618
Branch: refs/heads/branch-0.6-incubating
Commit: afb9e618a290fcc79f59600418cbf68f1356fd30
Parents: 6dfbda1
Author: Suma Shivaprasad <su...@gmail.com>
Authored: Mon Dec 14 16:29:47 2015 +0530
Committer: Suma Shivaprasad <su...@gmail.com>
Committed: Mon Dec 14 16:34:26 2015 +0530
----------------------------------------------------------------------
client/pom.xml | 6 ++
.../main/java/org/apache/atlas/AtlasClient.java | 22 +++++-
.../java/org/apache/atlas/AtlasClientTest.java | 67 ++++++++++++++++
.../notification/NotificationHookConsumer.java | 42 ++++++++++
.../NotificationHookConsumerTest.java | 82 ++++++++++++++++++++
pom.xml | 2 +-
release-log.txt | 1 +
.../atlas/web/listeners/GuiceServletConfig.java | 2 +-
8 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 279d894..d41b5bf 100755
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -67,5 +67,11 @@
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index becc4db..b108b25 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -19,6 +19,7 @@
package org.apache.atlas;
import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
@@ -67,6 +68,7 @@ public class AtlasClient {
public static final String DATATYPE = "dataType";
public static final String BASE_URI = "api/atlas/";
+ public static final String ADMIN_VERSION = "admin/version";
public static final String TYPES = "types";
public static final String URI_ENTITY = "entities";
public static final String URI_SEARCH = "discovery/search";
@@ -126,11 +128,29 @@ public class AtlasClient {
service = client.resource(UriBuilder.fromUri(baseUrl).build());
}
+ // for testing
+ AtlasClient(WebResource service) {
+ this.service = service;
+ }
+
protected Configuration getClientProperties() throws AtlasException {
return ApplicationProperties.get();
}
- enum API {
+ public boolean isServerReady() throws AtlasServiceException {
+ WebResource resource = getResource(API.VERSION);
+ try {
+ callAPIWithResource(API.VERSION, resource);
+ return true;
+ } catch (ClientHandlerException che) {
+ return false;
+ }
+ }
+
+ public enum API {
+
+ //Admin operations
+ VERSION(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK),
//Type operations
CREATE_TYPE(BASE_URI + TYPES, HttpMethod.POST, Response.Status.CREATED),
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/client/src/test/java/org/apache/atlas/AtlasClientTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
new file mode 100644
index 0000000..1e7eed1
--- /dev/null
+++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.Response;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AtlasClientTest {
+
+ @Test
+ public void shouldVerifyServerIsReady() throws AtlasServiceException {
+ WebResource webResource = mock(WebResource.class);
+ AtlasClient atlasClient = new AtlasClient(webResource);
+
+ WebResource.Builder builder = setupBuilder(webResource);
+ ClientResponse response = mock(ClientResponse.class);
+ when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
+ when(response.getEntity(String.class)).thenReturn("{\"Version\":\"version-rrelease\",\"Name\":\"apache-atlas\"," +
+ "\"Description\":\"Metadata Management and Data Governance Platform over Hadoop\"}");
+ when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response);
+
+ assertTrue(atlasClient.isServerReady());
+ }
+
+ private WebResource.Builder setupBuilder(WebResource webResource) {
+ WebResource adminVersionResource = mock(WebResource.class);
+ when(webResource.path(AtlasClient.API.VERSION.getPath())).thenReturn(adminVersionResource);
+ WebResource.Builder builder = mock(WebResource.Builder.class);
+ when(adminVersionResource.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
+ when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder);
+ return builder;
+ }
+
+ @Test
+ public void shouldReturnFalseIfServerIsNotReady() throws AtlasServiceException {
+ WebResource webResource = mock(WebResource.class);
+ AtlasClient atlasClient = new AtlasClient(webResource);
+ WebResource.Builder builder = setupBuilder(webResource);
+ when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenThrow(
+ new ClientHandlerException());
+ assertFalse(atlasClient.isServerReady());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index ffeb406..1bee26f 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -22,6 +22,7 @@ import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray;
@@ -42,6 +43,7 @@ public class NotificationHookConsumer implements Service {
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
+ public static final int SERVER_READY_WAIT_TIME_MS = 1000;
@Inject
private NotificationInterface notificationInterface;
@@ -77,15 +79,32 @@ public class NotificationHookConsumer implements Service {
}
}
+ static class Timer {
+ public void sleep(int interval) throws InterruptedException {
+ Thread.sleep(interval);
+ }
+ }
+
class HookConsumer implements Runnable {
private final NotificationConsumer<JSONArray> consumer;
+ private final AtlasClient client;
public HookConsumer(NotificationConsumer<JSONArray> consumer) {
+ this(atlasClient, consumer);
+ }
+
+ public HookConsumer(AtlasClient client, NotificationConsumer<JSONArray> consumer) {
+ this.client = client;
this.consumer = consumer;
}
@Override
public void run() {
+
+ if (!serverAvailable(new NotificationHookConsumer.Timer())) {
+ return;
+ }
+
while(consumer.hasNext()) {
JSONArray entityJson = consumer.next();
LOG.info("Processing message {}", entityJson);
@@ -98,5 +117,28 @@ public class NotificationHookConsumer implements Service {
}
}
}
+
+ boolean serverAvailable(Timer timer) {
+ try {
+ while (!client.isServerReady()) {
+ try {
+ LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
+ SERVER_READY_WAIT_TIME_MS);
+ timer.sleep(SERVER_READY_WAIT_TIME_MS);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while waiting for Atlas Server to become ready, " +
+ "exiting consumer thread.", e);
+ return false;
+ }
+ }
+ } catch (AtlasServiceException e) {
+ LOG.info(
+ "Handled AtlasServiceException while waiting for Atlas Server to become ready, " +
+ "exiting consumer thread.", e);
+ return false;
+ }
+ LOG.info("Atlas Server is ready, can start reading Kafka events.");
+ return true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
new file mode 100644
index 0000000..e4d7f8c
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class NotificationHookConsumerTest {
+
+ @Test
+ public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException {
+ AtlasClient atlasClient = mock(AtlasClient.class);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
+ NotificationHookConsumer.HookConsumer hookConsumer =
+ notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+ NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+ when(atlasClient.isServerReady()).thenReturn(true);
+
+ assertTrue(hookConsumer.serverAvailable(timer));
+
+ verifyZeroInteractions(timer);
+ }
+
+ @Test
+ public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException {
+ AtlasClient atlasClient = mock(AtlasClient.class);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
+ NotificationHookConsumer.HookConsumer hookConsumer =
+ notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+ NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+ when(atlasClient.isServerReady()).thenReturn(false, false, false, true);
+
+ assertTrue(hookConsumer.serverAvailable(timer));
+
+ verify(timer, times(3)).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
+ }
+
+ @Test
+ public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException {
+ AtlasClient atlasClient = mock(AtlasClient.class);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
+ NotificationHookConsumer.HookConsumer hookConsumer =
+ notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+ NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+ doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
+ when(atlasClient.isServerReady()).thenReturn(false);
+
+ assertFalse(hookConsumer.serverAvailable(timer));
+ }
+
+ @Test
+ public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException {
+ AtlasClient atlasClient = mock(AtlasClient.class);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer();
+ NotificationHookConsumer.HookConsumer hookConsumer =
+ notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
+ NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
+ when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION, new Exception()));
+
+ assertFalse(hookConsumer.serverAvailable(timer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 929d255..d9c3df1 100755
--- a/pom.xml
+++ b/pom.xml
@@ -323,7 +323,7 @@
<node.version>v0.10.30</node.version>
<slf4j.version>1.7.7</slf4j.version>
<jetty.version>9.2.12.v20150709</jetty.version>
- <jersey.version>1.10</jersey.version>
+ <jersey.version>1.19</jersey.version>
<jackson.version>1.8.3</jackson.version>
<tinkerpop.version>2.6.0</tinkerpop.version>
<titan.version>0.5.4</titan.version>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 07a3e9b..ffd69ea 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES:
+ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhemanth via sumasai)
ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai)
ATLAS-244 UI: Add Tag Tab (darshankumar89 via sumasai)
ATLAS-376 UI: Use the Schema API of the backend to populate details for Schema tab (darshankumar89 via sumasai)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/afb9e618/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index f0d80cb..c1f6a9b 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -124,7 +124,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
}
protected void startServices() {
- LOG.debug("Starting services");
+ LOG.info("Starting services");
Services services = injector.getInstance(Services.class);
services.start();
}