You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/28 05:57:19 UTC
[06/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java
new file mode 100755
index 0000000..7a180d2
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class ParallelServiceTest extends ODFTestcase {
+ private static final int NUMBER_OF_QUEUED_REQUESTS = 1;
+ Logger log = ODFTestLogger.get();
+
+ @Test
+ public void runDataSetsInParallelSuccess() throws Exception {
+ runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] { "successID1", "successID2" }), State.FINISHED, State.FINISHED);
+ }
+
+ private void runDataSetsInParallelAndCheckResult(List<String> dataSetIDs, State... expectedState) throws Exception {
+ log.info("Running data sets in parallel: " + dataSetIDs);
+ log.info("Expected state: " + expectedState);
+ AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+
+ List<AnalysisRequest> requests = new ArrayList<AnalysisRequest>();
+ List<AnalysisResponse> responses = new ArrayList<AnalysisResponse>();
+ List<String> idList = new ArrayList<String>();
+
+ for (int no = 0; no < NUMBER_OF_QUEUED_REQUESTS; no++) {
+ for (String dataSet : dataSetIDs) {
+ final AnalysisRequest req = ODFAPITest.createAnalysisRequest(Arrays.asList(dataSet + UUID.randomUUID().toString()));
+ AnalysisResponse resp = analysisManager.runAnalysis(req);
+ req.setId(resp.getId());
+ requests.add(req);
+ idList.add(resp.getId());
+ responses.add(resp);
+ }
+ }
+ log.info("Parallel requests started: " + idList.toString());
+
+ Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * dataSetIDs.size(), requests.size());
+ Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * dataSetIDs.size(), responses.size());
+
+ // check that requests are processed in parallel:
+ // there must be a point in time where both requests are in status "active"
+ log.info("Polling for status of parallel request...");
+ boolean foundPointInTimeWhereBothRequestsAreActive = false;
+ int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS;
+ List<State> allSingleStates = new ArrayList<AnalysisRequestStatus.State>();
+ do {
+ int foundActive = 0;
+ allSingleStates.clear();
+ for (AnalysisRequest request : requests) {
+ final State state = analysisManager.getAnalysisRequestStatus(request.getId()).getState();
+ if (state == State.ACTIVE) {
+ log.info("ACTIVE: " + request.getId() + " foundactive: " + foundActive);
+ foundActive++;
+ } else {
+ log.info("NOT ACTIVE " + request.getId() + " _ " + state);
+ }
+ allSingleStates.add(state);
+ }
+ if (foundActive > 1) {
+ foundPointInTimeWhereBothRequestsAreActive = true;
+ }
+
+ maxPolls--;
+ Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING);
+ } while (maxPolls > 0 && Utils.containsNone(allSingleStates, new State[] { State.ACTIVE, State.QUEUED }));
+
+ Assert.assertTrue(maxPolls > 0);
+ Assert.assertTrue(foundPointInTimeWhereBothRequestsAreActive);
+ Assert.assertTrue(allSingleStates.containsAll(Arrays.asList(expectedState)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java
new file mode 100755
index 0000000..5e3d97e
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.test.TestEnvironmentInitializer;
+
+public class TestEnvironmentMessagingInitializer implements TestEnvironmentInitializer {
+
+ public TestEnvironmentMessagingInitializer() {
+ }
+
+ public void start() {
+ Logger logger = Logger.getLogger(TestEnvironmentMessagingInitializer.class.getName());
+ try {
+ logger.info("Starting Test-Kafka during initialization...");
+ TestKafkaStarter starter = new TestKafkaStarter();
+ starter.startKafka();
+ logger.info("Test-Kafka initialized");
+ } catch (Exception exc) {
+ logger.log(Level.INFO, "Exception occurred while starting test kafka", exc);
+ throw new RuntimeException(exc);
+ }
+ }
+
+ @Override
+ public void stop() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getName() {
+ return "Kafka1001";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java
new file mode 100755
index 0000000..1c3025e
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
+import java.rmi.NotBoundException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.wink.json4j.JSONObject;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+import org.apache.atlas.odf.core.Utils;
+
+import kafka.cluster.Broker;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+public class TestKafkaStarter {
+
+ public static boolean deleteRecursive(File path) throws FileNotFoundException {
+ if (!path.exists()) {
+ throw new FileNotFoundException(path.getAbsolutePath());
+ }
+ boolean ret = true;
+ if (path.isDirectory()) {
+ for (File f : path.listFiles()) {
+ ret = ret && deleteRecursive(f);
+ }
+ }
+ return ret && path.delete();
+ }
+
+ static Thread zookeeperThread = null;
+ static boolean kafkaStarted = false;
+ static Object lockObject = new Object();
+ static KafkaServerStartable kafkaServer = null;
+ static ZooKeeperServerMainWithShutdown zooKeeperServer = null;
+
+
+ boolean cleanData = true; // all data is cleaned at server start !!
+
+ public boolean isCleanData() {
+ return cleanData;
+ }
+
+ public void setCleanData(boolean cleanData) {
+ this.cleanData = cleanData;
+ }
+
+ Logger logger = Logger.getLogger(TestKafkaStarter.class.getName());
+
+ void log(String s) {
+ logger.info(s);
+ }
+
+ int zookeeperStartupTime = 10000;
+ int kafkaStartupTime = 10000;
+
+ static class ZooKeeperServerMainWithShutdown extends ZooKeeperServerMain {
+ public void shutdown() {
+ super.shutdown();
+ }
+ }
+
+ private void startZookeeper() throws Exception {
+ log("Starting zookeeper");
+
+ final Properties zkProps = Utils.readConfigProperties("org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties");
+ final String zkPort = (String) zkProps.get("clientPort");
+ if (zooKeeperServer == null) {
+ log("zookeeper properties: " + zkProps);
+ if (cleanData) {
+ String dataDir = zkProps.getProperty("dataDir");
+ log("Removing all data from zookeeper data dir " + dataDir);
+ File dir = new File(dataDir);
+ if (dir.exists()) {
+ if (!deleteRecursive(dir)) {
+ throw new IOException("Could not delete directory " + dataDir);
+ }
+ }
+ }
+ final ZooKeeperServerMainWithShutdown zk = new ZooKeeperServerMainWithShutdown();
+ final ServerConfig serverConfig = new ServerConfig();
+ log("Loading zookeeper config...");
+ QuorumPeerConfig zkConfig = new QuorumPeerConfig();
+ zkConfig.parseProperties(zkProps);
+ serverConfig.readFrom(zkConfig);
+
+ Runnable zookeeperStarter = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ log("Now starting Zookeeper with API...");
+ zk.runFromConfig(serverConfig);
+ } catch (BindException ex) {
+ log("Embedded zookeeper could not be started, port is already in use. Trying to use external zookeeper");
+ ZooKeeper zk = null;
+ try {
+ zk = new ZooKeeper("localhost:" + zkPort, 5000, null);
+ if (zk.getState().equals(States.CONNECTED)) {
+ log("Using existing zookeeper running on port " + zkPort);
+ return;
+ } else {
+ throw new NotBoundException();
+ }
+ } catch (Exception zkEx) {
+ throw new RuntimeException("Could not connect to zookeeper on port " + zkPort + ". Please close all applications listening on this port.");
+ } finally {
+ if (zk != null) {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ logger.log(Level.WARNING, "An error occured closing the zk connection", e);
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+
+ }
+ };
+
+ zookeeperThread = new Thread(zookeeperStarter);
+ zookeeperThread.setDaemon(true);
+ zookeeperThread.start();
+ log("Zookeeper start initiated");
+ zooKeeperServer = zk;
+ }
+ ZkConnection conn = new ZkConnection("localhost:" + zkPort);
+ final CountDownLatch latch = new CountDownLatch(1);
+ conn.connect(new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+ log("Zookeeper event: " + event.getState());
+ if (event.getState().equals(KeeperState.SyncConnected)) {
+ log("Zookeeper server up and running");
+ latch.countDown();
+ }
+ }
+ });
+
+ boolean zkReady = latch.await(zookeeperStartupTime, TimeUnit.MILLISECONDS);
+ if (zkReady) {
+ log("Zookeeper initialized and started");
+
+ } else {
+ logger.severe("Zookeeper could not be initialized within " + (zookeeperStartupTime / 1000) + " sec");
+ }
+ conn.close();
+ }
+
+ public boolean isRunning() {
+ return kafkaStarted;
+ }
+
+ public void startKafka() throws Exception {
+ synchronized (lockObject) {
+ if (kafkaStarted) {
+ log("Kafka already running");
+ return;
+ }
+ this.startZookeeper();
+
+ log("Starting Kafka server...");
+ Properties kafkaProps = Utils.readConfigProperties("org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties");
+ log("Kafka properties: " + kafkaProps);
+ KafkaConfig kafkaConfig = new KafkaConfig(kafkaProps);
+ int kafkaPort = kafkaConfig.port();
+ if (cleanData && isPortAvailable(kafkaPort)) {
+ String logDir = kafkaProps.getProperty("log.dirs");
+ log("Removing all data from kafka log dir: " + logDir);
+ File dir = new File(logDir);
+ if (dir.exists()) {
+ if (!deleteRecursive(dir)) {
+ throw new IOException("Kafka logDir could not be deleted: " + logDir);
+ }
+ }
+ }
+ if (!isPortAvailable(kafkaPort)) {
+ log("Kafka port " + kafkaPort + " is already in use. "
+ + "Checking if zookeeper has a registered broker on this port to make sure it is an existing kafka instance using the port.");
+ ZooKeeper zk = new ZooKeeper(kafkaConfig.zkConnect(), 10000, null);
+ try {
+ List<String> ids = zk.getChildren("/brokers/ids", false);
+ if (ids != null && !ids.isEmpty()) {
+ for (String id : ids) {
+ String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null), "UTF-8");
+ JSONObject broker = new JSONObject(brokerInfo);
+ Integer port = new Integer(String.valueOf(broker.get("port")));
+ if (port != null && port.equals(kafkaPort)) {
+ log("Using externally started kafka broker on port " + port);
+ kafkaStarted = true;
+ return;
+ }
+ }
+ }
+ } catch (NoNodeException ex) {
+ log("No brokers registered with zookeeper!");
+ throw new RuntimeException("Kafka broker port " + kafkaPort
+ + " not available and no broker found! Please close all running applications listening on this port");
+ } finally {
+ if (zk != null) {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ logger.log(Level.WARNING, "An error occured closing the zk connection", e);
+ }
+ }
+ }
+ }
+ KafkaServerStartable kafka = KafkaServerStartable.fromProps(kafkaProps);
+ kafka.startup();
+ log("Kafka server start initiated");
+
+ kafkaServer = kafka;
+ log("Give Kafka a maximum of " + kafkaStartupTime + " ms to start");
+ ZkClient zk = new ZkClient(kafkaConfig.zkConnect(), 10000, 5000, ZKStringSerializer$.MODULE$);
+ int maxRetryCount = kafkaStartupTime / 1000;
+ int cnt = 0;
+ while (cnt < maxRetryCount) {
+ cnt++;
+ Seq<Broker> allBrokersInCluster = new ZkUtils(zk, new ZkConnection(kafkaConfig.zkConnect()), false).getAllBrokersInCluster();
+ List<Broker> brokers = JavaConversions.seqAsJavaList(allBrokersInCluster);
+ for (Broker broker : brokers) {
+ if (broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port() == kafkaPort) {
+ log("Broker is registered, Kafka is available after " + cnt + " seconds");
+ kafkaStarted = true;
+ return;
+ }
+ }
+ Thread.sleep(1000);
+ }
+ logger.severe("Kafka broker was not started after " + kafkaStartupTime + " ms");
+ }
+ }
+
+ public void shutdownKafka() {
+ // do nothing for shutdown
+ }
+
+ boolean isPortAvailable(int port) {
+ ServerSocket ss = null;
+ DatagramSocket ds = null;
+ try {
+ ss = new ServerSocket(port);
+ ss.setReuseAddress(true);
+ ds = new DatagramSocket(port);
+ ds.setReuseAddress(true);
+ return true;
+ } catch (IOException e) {
+ } finally {
+ if (ds != null) {
+ ds.close();
+ }
+
+ if (ss != null) {
+ try {
+ ss.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties
new file mode 100755
index 0000000..4769c95
--- /dev/null
+++ b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties
@@ -0,0 +1,136 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+############################# Socket Server Settings #############################
+
+listeners=PLAINTEXT://:9092
+
+# The port the socket server listens on
+# port=9092
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+#host.name=localhost
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured. Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=3
+
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs=/tmp/odf-embedded-test-kafka/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=24
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeperConnectionTimeoutMs=6000
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties
new file mode 100755
index 0000000..7234e9c
--- /dev/null
+++ b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties
@@ -0,0 +1,34 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/odf-embedded-test-kafka/zookeeper
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
+
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties
new file mode 100755
index 0000000..5611c29
--- /dev/null
+++ b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties
@@ -0,0 +1,18 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+## USE for TESTs only
+
+ODFConfigurationStorage=MockConfigurationStorage
+SparkServiceExecutor=MockSparkServiceExecutor
+NotificationManager=TestNotificationManager
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/.gitignore
----------------------------------------------------------------------
diff --git a/odf/odf-spark-example-application/.gitignore b/odf/odf-spark-example-application/.gitignore
new file mode 100755
index 0000000..d523581
--- /dev/null
+++ b/odf/odf-spark-example-application/.gitignore
@@ -0,0 +1,20 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+.settings
+target
+.classpath
+.project
+.factorypath
+.DS_Store
+/bin/
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/pom.xml
----------------------------------------------------------------------
diff --git a/odf/odf-spark-example-application/pom.xml b/odf/odf-spark-example-application/pom.xml
new file mode 100755
index 0000000..a2baa9e
--- /dev/null
+++ b/odf/odf-spark-example-application/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.apache.atlas.odf</groupId>
+ <artifactId>odf</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>odf-spark-example-application</artifactId>
+ <packaging>jar</packaging>
+ <name>odf-spark-example-application</name>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.3</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ </plugin>
+ </plugins>
+
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.11</artifactId>
+ <version>2.1.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency> <!-- Spark dependency -->
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.11</artifactId>
+ <version>2.1.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-api</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java
----------------------------------------------------------------------
diff --git a/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java
new file mode 100755
index 0000000..f5f7b70
--- /dev/null
+++ b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.spark.SparkDiscoveryServiceBase;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import org.apache.atlas.odf.api.spark.SparkDiscoveryService;
+import org.apache.atlas.odf.api.spark.SparkUtils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse.ResponseCode;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+
+public class SparkDiscoveryServiceExample extends SparkDiscoveryServiceBase implements SparkDiscoveryService {
+ static Logger logger = Logger.getLogger(SparkDiscoveryServiceExample.class.getName());
+
+ @Override
+ public DataSetCheckResult checkDataSet(DataSetContainer dataSetContainer) {
+ logger.log(Level.INFO, "Checking data set access.");
+ DataSetCheckResult checkResult = new DataSetCheckResult();
+ checkResult.setDataAccess(DataSetCheckResult.DataAccess.Possible);
+ Dataset<Row> df = SparkUtils.createDataFrame(this.spark, dataSetContainer, this.mds);
+ // Print first rows to check whether data frame can be accessed
+ df.show(10);
+ return checkResult;
+ }
+
+ @Override
+ public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+ logger.log(Level.INFO, "Starting discovery service.");
+ Dataset<Row> df = SparkUtils.createDataFrame(spark, request.getDataSetContainer(), this.mds);
+ Map<String,Dataset<Row>> annotationDataFrameMap = SummaryStatistics.processDataFrame(this.spark, df, null);
+ DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+ response.setCode(ResponseCode.OK);
+ response.setDetails("Discovery service successfully completed.");
+ response.setResult(SparkUtils.createAnnotationsFromDataFrameMap(request.getDataSetContainer(), annotationDataFrameMap, this.mds));
+ return response;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java
----------------------------------------------------------------------
diff --git a/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java
new file mode 100755
index 0000000..a7d1542
--- /dev/null
+++ b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import org.apache.atlas.odf.api.spark.SparkUtils;
+import org.apache.spark.SparkFiles;
+
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+public class SummaryStatistics {
+ static Logger logger = Logger.getLogger(SummaryStatistics.class.getName());
+ private static final String CSV_FILE_PARAMETER = "-dataFile=";
+ // The following constant is defined in class DiscoveryServiceSparkEndpoint but is duplicated here to avoid dependencies to the ODF code:
+ private static final String ANNOTATION_PROPERTY_COLUMN_NAME = "ODF_ANNOTATED_COLUMN";
+
+ // The main method is only available for testing purposes and is not called by ODF
+ public static void main(String[] args) {
+ logger.log(Level.INFO, "Running spark launcher with arguments: " + args[0]);
+ if ((args[0] == null) || (!args[0].startsWith(CSV_FILE_PARAMETER))) {
+ System.out.println(MessageFormat.format("Error: Spark Application Parameter '{0}' is missing.", CSV_FILE_PARAMETER));
+ System.exit(1);
+ }
+ String dataFilePath = SparkFiles.get(args[0].replace(CSV_FILE_PARAMETER, ""));
+ logger.log(Level.INFO, "Data file path is " + dataFilePath);
+
+ // Create Spark session
+ SparkSession spark = SparkSession.builder().master("local").appName("ODF Spark example application").getOrCreate();
+
+ // Read CSV file into data frame
+ Dataset<Row> df = spark.read()
+ .format("com.databricks.spark.csv")
+ .option("inferSchema", "true")
+ .option("header", "true")
+ .load(dataFilePath);
+
+ // Run actual job and print result
+ Map<String, Dataset<Row>> annotationDataFrameMap = null;
+ try {
+ annotationDataFrameMap = processDataFrame(spark, df, args);
+ } catch (Exception e) {
+ logger.log(Level.INFO, MessageFormat.format("An error occurred while processing data set {0}:", args[0]), e);
+ } finally {
+ // Close and stop spark context
+ spark.close();
+ spark.stop();
+ }
+ if (annotationDataFrameMap == null) {
+ System.exit(1);
+ } else {
+ // Print all annotationDataFrames for all annotation types to stdout
+ for (Map.Entry<String, Dataset<Row>> entry : annotationDataFrameMap.entrySet()) {
+ logger.log(Level.INFO, "Result data frame for annotation type " + entry.getKey() + ":");
+ entry.getValue().show();
+ }
+ }
+ }
+
+ // The following method contains the actual implementation of the ODF Spark discovery service
+ public static Map<String,Dataset<Row>> processDataFrame(SparkSession spark, Dataset<Row> df, String[] args) {
+ logger.log(Level.INFO, "Started summary statistics Spark application.");
+ Map<String, Dataset<Row>> resultMap = new HashMap<String, Dataset<Row>>();
+
+ // Print input data set
+ df.show();
+
+ // Create column annotation data frame that contains basic data frame statistics
+ Dataset<Row> dfStatistics = df.describe();
+
+ // Rename "summary" column to ANNOTATION_PROPERTY_COLUMN_NAME
+ String[] columnNames = dfStatistics.columns();
+ columnNames[0] = ANNOTATION_PROPERTY_COLUMN_NAME;
+ Dataset<Row> summaryStatistics = dfStatistics.toDF(columnNames);
+ summaryStatistics.show();
+ String columnAnnotationTypeName = "SparkSummaryStatisticsAnnotation";
+
+ // Transpose table to turn it into format required by ODF
+ Dataset<Row> columnAnnotationDataFrame = SparkUtils.transposeDataFrame(spark, summaryStatistics);
+ columnAnnotationDataFrame.show();
+
+ // Create table annotation that contains the data frame's column count
+ String tableAnnotationTypeName = "SparkTableAnnotation";
+ Dataset<Row> tableAnnotationDataFrame = columnAnnotationDataFrame.select(new Column("count")).limit(1);
+ tableAnnotationDataFrame.show();
+
+ // Add annotation data frames to result map
+ resultMap.put(columnAnnotationTypeName, columnAnnotationDataFrame);
+ resultMap.put(tableAnnotationTypeName, tableAnnotationDataFrame);
+
+ logger.log(Level.INFO, "Spark job finished.");
+ return resultMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/.gitignore
----------------------------------------------------------------------
diff --git a/odf/odf-spark/.gitignore b/odf/odf-spark/.gitignore
new file mode 100755
index 0000000..cde346c
--- /dev/null
+++ b/odf/odf-spark/.gitignore
@@ -0,0 +1,19 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+.settings
+target
+.classpath
+.project
+.factorypath
+.DS_Store
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/pom.xml
----------------------------------------------------------------------
diff --git a/odf/odf-spark/pom.xml b/odf/odf-spark/pom.xml
new file mode 100755
index 0000000..378f280
--- /dev/null
+++ b/odf/odf-spark/pom.xml
@@ -0,0 +1,242 @@
+<?xml version="1.0"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>odf-spark</artifactId>
+ <packaging>jar</packaging>
+ <name>odf-spark</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-api</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-core</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-core</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-messaging</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-messaging</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-store</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <!-- Workaround: Add odf-spark-example-application because dynamic jar load does not seem to work on IBM JDK -->
+ <dependency>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-spark-example-application</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-launcher_2.11</artifactId>
+ <version>2.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.4</version>
+ </dependency>
+ <!-- The following Spark dependencies are needed for testing only. -->
+ <!-- Nevertheless, they have to be added as compile dependencies in order to become available to the SDPFactory. -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.11</artifactId>
+ <version>2.1.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.11</artifactId>
+ <version>2.1.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <build>
+ <resources>
+ <resource>
+ <directory>${project.build.directory}/downloads</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19</version>
+ <configuration>
+ <systemPropertyVariables>
+ <odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+ <odf.logspec>${odf.unittest.logspec}</odf.logspec>
+ <odf.build.project.name>${project.name}</odf.build.project.name>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>download-jar-file</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-api</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ <outputDirectory>${project.build.directory}/downloads/META-INF/spark</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-spark-example-application</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ <outputDirectory>/tmp/odf-spark</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-spark-example-application</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ <outputDirectory>${project.build.directory}/downloads/META-INF/spark</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.wink</groupId>
+ <artifactId>wink-json4j</artifactId>
+ <version>1.4</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ <outputDirectory>${project.build.directory}/downloads/META-INF/spark</outputDirectory>
+ </artifactItem>
+ </artifactItems>
+ <includes>**/*</includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>integration-tests</id>
+ <activation>
+ <property>
+ <name>reduced-tests</name>
+ <value>!true</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.19</version>
+ <configuration>
+ <systemPropertyVariables>
+ <odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+ <odf.logspec>${odf.integrationtest.logspec}</odf.logspec>
+ </systemPropertyVariables>
+ <dependenciesToScan>
+ <dependency>org.apache.atlas.odf:odf-core</dependency>
+ </dependenciesToScan>
+ <includes>
+ <include>**/integrationtest/**/SparkDiscoveryServiceLocalTest.java</include>
+ </includes>
+ </configuration>
+ <executions>
+ <execution>
+ <id>integration-test</id>
+ <goals>
+ <goal>integration-test</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>verify</id>
+ <goals>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java
----------------------------------------------------------------------
diff --git a/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java
new file mode 100755
index 0000000..84ae80c
--- /dev/null
+++ b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+import java.text.MessageFormat;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.RelationalDataSet;
+import org.apache.atlas.odf.api.spark.SparkDiscoveryService;
+import org.apache.atlas.odf.api.spark.SparkServiceExecutor;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint.SERVICE_INTERFACE_TYPE;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.spark.SparkUtils;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+ * This class calls the actual Spark discovery services depending on the type of interface they implement.
+ * The class is used to run a Spark discovery service either on a local Spark cluster ({@link SparkServiceExecutorImpl})
+ * or on a remote Spark cluster ({@link SparkApplicationStub}).
+ *
+ *
+ */
+
+public class LocalSparkServiceExecutor implements SparkServiceExecutor {
+ private Logger logger = Logger.getLogger(LocalSparkServiceExecutor.class.getName());
+ private SparkSession spark;
+ private MetadataStore mds;
+
+ void setSparkSession(SparkSession spark) {
+ this.spark = spark;
+ }
+
+ void setMetadataStore(MetadataStore mds) {
+ this.mds = mds;
+ }
+
+ @Override
+ public DataSetCheckResult checkDataSet(DiscoveryServiceProperties dsProp, DataSetContainer container) {
+ DiscoveryServiceSparkEndpoint endpoint;
+ try {
+ endpoint = JSONUtils.convert(dsProp.getEndpoint(), DiscoveryServiceSparkEndpoint.class);
+ } catch (JSONException e1) {
+ throw new RuntimeException(e1);
+ }
+ DataSetCheckResult checkResult = new DataSetCheckResult();
+ try {
+ SERVICE_INTERFACE_TYPE inputMethod = endpoint.getInputMethod();
+ if (inputMethod.equals(SERVICE_INTERFACE_TYPE.DataFrame)) {
+ MetaDataObject dataSet = container.getDataSet();
+ if (!(dataSet instanceof RelationalDataSet)) {
+ checkResult.setDataAccess(DataSetCheckResult.DataAccess.NotPossible);
+ checkResult.setDetails("This service can only process relational data sets.");
+ } else {
+ checkResult.setDataAccess(DataSetCheckResult.DataAccess.Possible);
+ Dataset<Row> df = SparkUtils.createDataFrame(this.spark, container, this.mds);
+ // Print first rows to check whether data frame can be accessed
+ df.show(10);
+ }
+ } else if (inputMethod.equals(SERVICE_INTERFACE_TYPE.Generic)) {
+ Class<?> clazz = Class.forName(endpoint.getClassName());
+ Constructor<?> cons = clazz.getConstructor();
+ SparkDiscoveryService service = (SparkDiscoveryService) cons.newInstance();
+ service.setMetadataStore(this.mds);
+ service.setSparkSession(this.spark);
+ checkResult = service.checkDataSet(container);
+ }
+ } catch (Exception e) {
+ logger.log(Level.WARNING,"Access to data set not possible.", e);
+ checkResult.setDataAccess(DataSetCheckResult.DataAccess.NotPossible);
+ checkResult.setDetails(getExceptionAsString(e));
+ } finally {
+ this.spark.close();
+ this.spark.stop();
+ }
+ return checkResult;
+ }
+
+ @Override
+ public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceProperties dsProp, DiscoveryServiceRequest request) {
+ DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse();
+ response.setDetails("Annotations created successfully");
+ response.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+ try {
+ DiscoveryServiceSparkEndpoint endpoint = JSONUtils.convert(dsProp.getEndpoint(), DiscoveryServiceSparkEndpoint.class);
+ Class<?> clazz = Class.forName(endpoint.getClassName());
+ DataSetContainer container = request.getDataSetContainer();
+ String[] optionalArgs = {}; // For future use
+ SERVICE_INTERFACE_TYPE inputMethod = endpoint.getInputMethod();
+
+ if (inputMethod.equals(SERVICE_INTERFACE_TYPE.DataFrame)) {
+ if (!(container.getDataSet() instanceof RelationalDataSet)) {
+ throw new RuntimeException("This service can only process relational data sets (DataFile or Table).");
+ }
+ Dataset<Row> df = SparkUtils.createDataFrame(this.spark, container, this.mds);
+ @SuppressWarnings("unchecked")
+ Map<String, Dataset<Row>> annotationDataFrameMap = (Map<String, Dataset<Row>>) clazz.getMethod("processDataFrame", SparkSession.class, Dataset.class, String[].class).invoke(null, this.spark, df, (Object[]) optionalArgs);
+ response.setResult(SparkUtils.createAnnotationsFromDataFrameMap(container, annotationDataFrameMap, this.mds));
+ } else if (inputMethod.equals(SERVICE_INTERFACE_TYPE.Generic)) {
+ Constructor<?> cons = clazz.getConstructor();
+ SparkDiscoveryService service = (SparkDiscoveryService) cons.newInstance();
+ service.setMetadataStore(this.mds);
+ service.setSparkSession(this.spark);
+ response = service.runAnalysis(request);
+ } else {
+ throw new RuntimeException(MessageFormat.format("Unsupported interface type {0}.", inputMethod));
+ }
+ } catch(Exception e) {
+ logger.log(Level.WARNING,"Error running discovery service.", e);
+ response.setDetails(getExceptionAsString(e));
+ response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+ } finally {
+ this.spark.close();
+ this.spark.stop();
+ }
+ return response;
+ }
+
+ public static String getExceptionAsString(Throwable exc) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ exc.printStackTrace(pw);
+ String st = sw.toString();
+ return st;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java
----------------------------------------------------------------------
diff --git a/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java
new file mode 100755
index 0000000..81fea2c
--- /dev/null
+++ b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.text.MessageFormat;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+
+import org.apache.atlas.odf.core.Utils;
+
+public class SparkJars {
+ private static Logger logger = Logger.getLogger(SparkJars.class.getName());
+
+ public String getResourceAsJarFile(String resource) {
+ ClassLoader cl = this.getClass().getClassLoader();
+ InputStream inputStream = cl.getResourceAsStream(resource);
+ if (inputStream == null) {
+ String msg = MessageFormat.format("Resource {0} was not found.", resource);
+ logger.log(Level.WARNING, msg);
+ throw new RuntimeException(msg);
+ }
+ String tempFilePath = null;
+ try {
+ File tempFile = File.createTempFile("driver", "jar");
+ tempFilePath = tempFile.getAbsolutePath();
+ logger.log(Level.INFO, "Creating temporary file " + tempFilePath);
+ IOUtils.copy(inputStream, new FileOutputStream(tempFile));
+ inputStream.close();
+ Utils.runSystemCommand("chmod 755 " + tempFilePath);
+ } catch (IOException e) {
+ String msg = MessageFormat.format("Error creating temporary file from resource {0}: ", resource);
+ logger.log(Level.WARNING, msg, e);
+ throw new RuntimeException(msg + Utils.getExceptionAsString(e));
+ }
+ return tempFilePath;
+ }
+
+ public String getUrlasJarFile(String urlString) {
+ try {
+ File tempFile = File.createTempFile("driver", "jar");
+ logger.log(Level.INFO, "Creating temporary file " + tempFile);
+ FileUtils.copyURLToFile(new URL(urlString), tempFile);
+ Utils.runSystemCommand("chmod 755 " + tempFile.getAbsolutePath());
+ return tempFile.getAbsolutePath();
+ } catch (MalformedURLException e) {
+ String msg = MessageFormat.format("An invalid Spark application URL {0} was provided: ", urlString);
+ logger.log(Level.WARNING, msg, e);
+ throw new RuntimeException(msg + Utils.getExceptionAsString(e));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "Error processing Spark application jar file.", e);
+ throw new RuntimeException("Error processing Spark application jar file: " + Utils.getExceptionAsString(e));
+ }
+ }
+
+ public byte[] getFileAsByteArray(String resourceOrURL) {
+ try {
+ InputStream inputStream;
+ if (isValidUrl(resourceOrURL)) {
+ inputStream = new URL(resourceOrURL).openStream();
+ } else {
+ ClassLoader cl = this.getClass().getClassLoader();
+ inputStream = cl.getResourceAsStream(resourceOrURL);
+ if (inputStream == null) {
+ String msg = MessageFormat.format("Resource {0} was not found.", resourceOrURL);
+ logger.log(Level.WARNING, msg);
+ throw new RuntimeException(msg);
+ }
+ }
+ byte[] bytes = IOUtils.toByteArray(inputStream);
+ return bytes;
+ } catch (IOException e) {
+ String msg = MessageFormat.format("Error converting jar file {0} into byte array: ", resourceOrURL);
+ logger.log(Level.WARNING, msg, e);
+ throw new RuntimeException(msg + Utils.getExceptionAsString(e));
+ }
+ }
+
+ public static boolean isValidUrl(String urlString) {
+ try {
+ new URL(urlString);
+ return true;
+ } catch (java.net.MalformedURLException exc) {
+ // Expected exception if URL is not valid
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java
----------------------------------------------------------------------
diff --git a/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java
new file mode 100755
index 0000000..720343b
--- /dev/null
+++ b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.text.MessageFormat;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.api.spark.SparkServiceExecutor;
+import org.apache.spark.sql.SparkSession;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.settings.SparkConfig;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+ * Calls the appropriate implementation (local vs. remote) of the @link SparkServiceExecutor depending on the current @SparkConfig.
+ * Prepares the local Spark cluster to be used in unit and integration tests.
+ *
+ *
+ */
+
+public class SparkServiceExecutorImpl implements SparkServiceExecutor {
+ private Logger logger = Logger.getLogger(SparkServiceExecutorImpl.class.getName());
+
+ @Override
+ public DataSetCheckResult checkDataSet(DiscoveryServiceProperties dsri, DataSetContainer dataSetContainer) {
+ return this.getExecutor(dsri).checkDataSet(dsri, dataSetContainer);
+ };
+
+ @Override
+ public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceProperties dsri, DiscoveryServiceRequest request) {
+ return this.getExecutor(dsri).runAnalysis(dsri, request);
+ }
+
+ private SparkServiceExecutor getExecutor(DiscoveryServiceProperties dsri) {
+ SettingsManager config = new ODFFactory().create().getSettingsManager();
+ DiscoveryServiceSparkEndpoint endpoint;
+ try {
+ endpoint = JSONUtils.convert(dsri.getEndpoint(), DiscoveryServiceSparkEndpoint.class);
+ } catch (JSONException e1) {
+ throw new RuntimeException(e1);
+ }
+
+ SparkConfig sparkConfig = config.getODFSettings().getSparkConfig();
+ if (sparkConfig == null) {
+ String msg = "No Spark service is configured. Please manually register Spark service or bind a Spark service to your ODF Bluemix app.";
+ logger.log(Level.SEVERE, msg);
+ throw new RuntimeException(msg);
+ } else {
+ logger.log(Level.INFO, "Using local Spark cluster {0}.", sparkConfig.getClusterMasterUrl());
+ SparkSession spark = SparkSession.builder().master(sparkConfig.getClusterMasterUrl()).appName(dsri.getName()).getOrCreate();
+ SparkJars sparkJars = new SparkJars();
+ try {
+ // Load jar file containing the Spark job to be started
+ URLClassLoader classLoader = (URLClassLoader)ClassLoader.getSystemClassLoader();
+ Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
+ method.setAccessible(true);
+ String applicationJarFile;
+ if (SparkJars.isValidUrl(endpoint.getJar())) {
+ applicationJarFile = sparkJars.getUrlasJarFile(endpoint.getJar());
+ } else {
+ applicationJarFile = sparkJars.getResourceAsJarFile(endpoint.getJar());
+ }
+ logger.log(Level.INFO, "Using application jar file {0}.", applicationJarFile);
+ method.invoke(classLoader, new URL("file:" + applicationJarFile));
+ } catch (Exception e) {
+ String msg = MessageFormat.format("Error loading jar file {0} implementing the Spark discovery service: ", endpoint.getJar());
+ logger.log(Level.WARNING, msg, e);
+ spark.close();
+ spark.stop();
+ throw new RuntimeException(msg, e);
+ }
+ LocalSparkServiceExecutor executor = new LocalSparkServiceExecutor();
+ executor.setSparkSession(spark);
+ executor.setMetadataStore(new ODFFactory().create().getMetadataStore());
+ return executor;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
----------------------------------------------------------------------
diff --git a/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties b/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
new file mode 100755
index 0000000..d6651ee
--- /dev/null
+++ b/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+SparkServiceExecutor=org.apache.atlas.odf.core.spark.SparkServiceExecutorImpl
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/.gitignore
----------------------------------------------------------------------
diff --git a/odf/odf-store/.gitignore b/odf/odf-store/.gitignore
new file mode 100755
index 0000000..ea5ddb8
--- /dev/null
+++ b/odf/odf-store/.gitignore
@@ -0,0 +1,5 @@
+.settings
+target
+.classpath
+.project
+.factorypath
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/pom.xml
----------------------------------------------------------------------
diff --git a/odf/odf-store/pom.xml b/odf/odf-store/pom.xml
new file mode 100755
index 0000000..3d0a93d
--- /dev/null
+++ b/odf/odf-store/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>odf-store</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-core</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-messaging</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.6</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas.odf</groupId>
+ <artifactId>odf-core</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19</version>
+ <configuration>
+ <systemPropertyVariables>
+ <odf.logspec>${odf.unittest.logspec}</odf.logspec>
+ <odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+ <odf.build.project.name>${project.name}</odf.build.project.name>
+ </systemPropertyVariables>
+ <dependenciesToScan>
+ <dependency>org.apache.atlas.odf:odf-core</dependency>
+ </dependenciesToScan>
+ <includes>
+ <include>**/configuration/**/*.java</include>
+ <include>**/ZookeeperConfigurationStorageTest.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java
----------------------------------------------------------------------
diff --git a/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java b/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java
new file mode 100755
index 0000000..3ea9927
--- /dev/null
+++ b/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.store.zookeeper34;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.text.MessageFormat;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import org.apache.atlas.odf.core.store.ODFConfigurationStorage;
+
+public class ZookeeperConfigurationStorage implements ODFConfigurationStorage {
+ private Logger logger = Logger.getLogger(ZookeeperConfigurationStorage.class.getName());
+ static final String ZOOKEEPER_CONFIG_PATH = "/odf/config";
+ static String configCache = null; // cache is a string so that the object is not accidentally modified
+ static Object configCacheLock = new Object();
+ static HashSet<String> pendingConfigChanges = new HashSet<String>();
+
+ String zookeeperString;
+
+ public ZookeeperConfigurationStorage() {
+ zookeeperString = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+ }
+
+ public void clearCache() {
+ synchronized (configCacheLock) {
+ configCache = null;
+ }
+ }
+
+ @Override
+ public void storeConfig(ConfigContainer config) {
+ synchronized (configCacheLock) {
+ ZooKeeper zk = null;
+ String configTxt = null;
+ try {
+ configTxt = JSONUtils.toJSON(config);
+ zk = getZkConnectionSynchronously();
+ if (zk.exists(getZookeeperConfigPath(), false) == null) {
+ //config file doesn't exist in zookeeper yet, write default config
+ logger.log(Level.WARNING, "Zookeeper config not found - creating it before writing: {0}", configTxt);
+ initializeConfiguration(zk, configTxt);
+ }
+ zk.setData(getZookeeperConfigPath(), configTxt.getBytes("UTF-8"), -1);
+ configCache = configTxt;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException("A zookeeper connection could not be established in time to write settings");
+ } catch (KeeperException e) {
+ if (Code.NONODE.equals(e.code())) {
+ logger.info("Setting could not be written, the required node is not available!");
+ initializeConfiguration(zk, configTxt);
+ return;
+ }
+ //This should never happen! Only NoNode or BadVersion codes are possible. Because the file version is ignored, a BadVersion should never occur
+ throw new RuntimeException("A zookeeper connection could not be established because of an unknown exception", e);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("A zookeeper connection could not be established because of an incorrect encoding");
+ } catch (JSONException e) {
+ throw new RuntimeException("Configuration is not valid", e);
+ } finally {
+ if (zk != null) {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public ConfigContainer getConfig(ConfigContainer defaultConfiguration) {
+ synchronized (configCacheLock) {
+ if (configCache == null) {
+ ZooKeeper zk = getZkConnectionSynchronously();
+ try {
+ if (zk.exists(getZookeeperConfigPath(), false) == null) {
+ //config file doesn't exist in zookeeper yet, write default config
+ String defaultConfigString = JSONUtils.toJSON(defaultConfiguration);
+ logger.log(Level.WARNING, "Zookeeper config not found - creating now with default: {0}", defaultConfigString);
+ initializeConfiguration(zk, defaultConfigString);
+ }
+ byte[] configBytes = zk.getData(getZookeeperConfigPath(), true, new Stat());
+ if (configBytes != null) {
+ String configString = new String(configBytes, "UTF-8");
+ configCache = configString;
+ } else {
+ // should never happen
+ throw new RuntimeException("Zookeeper configuration was not stored");
+ }
+ } catch (KeeperException e) {
+ throw new RuntimeException(MessageFormat.format("Zookeeper config could not be read, {0} Zookeeper exception occured!", e.code().name()), e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Zookeeper config could not be read, the connection was interrupded", e);
+ } catch (IOException | JSONException e) {
+ throw new RuntimeException("Zookeeper config could not be read, the file could not be parsed correctly", e);
+ } finally {
+ if (zk != null) {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+ }
+
+ }
+ try {
+ return JSONUtils.fromJSON(configCache, ConfigContainer.class);
+ } catch (JSONException e) {
+ throw new RuntimeException("Cached configuration was not valid", e);
+ }
+ }
+ }
+
+ private void initializeConfiguration(ZooKeeper zk, String config) {
+ try {
+ if (getZookeeperConfigPath().contains("/")) {
+ String[] nodes = getZookeeperConfigPath().split("/");
+ StringBuilder path = new StringBuilder();
+ for (String node : nodes) {
+ if (node.trim().equals("")) {
+ //ignore empty paths
+ continue;
+ }
+ path.append("/" + node);
+ try {
+ zk.create(path.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (NodeExistsException ex) {
+ //ignore if node already exists and continue with next node
+ }
+ }
+ }
+
+ //use version -1 to ignore versioning conflicts
+ try {
+ zk.setData(getZookeeperConfigPath(), config.toString().getBytes("UTF-8"), -1);
+ } catch (UnsupportedEncodingException e) {
+ // should not happen
+ throw new RuntimeException(e);
+ }
+ } catch (KeeperException e) {
+ throw new RuntimeException(MessageFormat.format("The zookeeper config could not be initialized, a Zookeeper exception of type {0} occured!", e.code().name()), e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("The zookeeper config could not be initialized, the connection got interrupted!", e);
+ }
+ }
+
+ private ZooKeeper getZkConnectionSynchronously() {
+ final CountDownLatch latch = new CountDownLatch(1);
+ logger.log(Level.FINE, "Trying to connect to zookeeper at {0}", zookeeperString);
+ ZooKeeper zk = null;
+ try {
+ int timeout = 5;
+ zk = new ZooKeeper(zookeeperString, timeout * 1000, new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState().equals(Watcher.Event.KeeperState.ConnectedReadOnly) || event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
+ //count down latch, connected successfully to zk
+ latch.countDown();
+ }
+ }
+ });
+ //block thread till countdown, maximum of "timeout" seconds
+ latch.await(5 * timeout, TimeUnit.SECONDS);
+ if (latch.getCount() > 0) {
+ zk.close();
+ throw new RuntimeException("The zookeeper connection could not be retrieved on time!");
+ }
+ return zk;
+ } catch (IOException e1) {
+ throw new RuntimeException("The zookeeper connection could not be retrieved, the connection failed!", e1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Zookeeper connection could not be retrieved, the thread was interrupted!", e);
+ }
+ }
+
+ public String getZookeeperConfigPath() {
+ return ZOOKEEPER_CONFIG_PATH;
+ }
+
+ @Override
+ public void onConfigChange(ConfigContainer container) {
+ synchronized (configCacheLock) {
+ try {
+ configCache = JSONUtils.toJSON(container);
+ } catch (JSONException e) {
+ throw new RuntimeException("Config could not be cloned!", e);
+ }
+ }
+ }
+
+ @Override
+ public void addPendingConfigChange(String changeId) {
+ synchronized (configCacheLock) {
+ pendingConfigChanges.add(changeId);
+ }
+ }
+
+ @Override
+ public void removePendingConfigChange(String changeId) {
+ synchronized (configCacheLock) {
+ pendingConfigChanges.remove(changeId);
+ }
+ }
+
+ @Override
+ public boolean isConfigChangePending(String changeId) {
+ synchronized (configCacheLock) {
+ return pendingConfigChanges.contains(changeId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties
----------------------------------------------------------------------
diff --git a/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties b/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties
new file mode 100755
index 0000000..7234e9c
--- /dev/null
+++ b/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties
@@ -0,0 +1,34 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/odf-embedded-test-kafka/zookeeper
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
+
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
----------------------------------------------------------------------
diff --git a/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties b/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
new file mode 100755
index 0000000..65a7b5d
--- /dev/null
+++ b/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+ODFConfigurationStorage=org.apache.atlas.odf.core.store.zookeeper34.ZookeeperConfigurationStorage