You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2016/09/29 12:45:26 UTC

[49/50] [abbrv] airavata git commit: [AIRAVATA-2065] Introduce more robust logging mechanism for Airavata

[AIRAVATA-2065] Introduce more robust logging mechanism for Airavata

Summary:
1. Add configurable KafkaAppender (default turned off) which will push
the airavata logs to kafka topic.

2. Use configured kafka topic prefix and construct the topic name based on
the airavata role running.

3. Bring back logback as the logging library.
4. Use git describe in the server version string and introduce new serverId with more metadata.
5. Added new class to handle specifically AWS deployment (use aws-metadata service).
6. Add MDC context to orchestrator and gfac thread executions.

Test Plan: Deployed a Airavata setup in AWS with ES cluster and tested the logs.

Reviewers: supun, ajinkya, suresh

Differential Revision: https://airavata.exana.io/D2


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/342400cf
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/342400cf
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/342400cf

Branch: refs/heads/lahiru/AIRAVATA-2065
Commit: 342400cf310a8cd9d3350751f31e6895e4ecf603
Parents: 59142dd
Author: Lahiru Ginnaliya Gamathige <la...@apache.org>
Authored: Sun Aug 21 00:49:58 2016 -0700
Committer: Lahiru Ginnaliya Gamathige <la...@apache.org>
Committed: Wed Sep 28 00:29:05 2016 -0700

----------------------------------------------------------------------
 modules/commons/pom.xml                         |  12 +-
 .../airavata/common/utils/BuildConstant.java    |  26 ++++
 .../airavata/common/logging/Exception.java      |  66 +++++++++
 .../airavata/common/logging/LogEntry.java       | 132 ++++++++++++++++++
 .../airavata/common/logging/MDCConstants.java   |  30 ++++
 .../apache/airavata/common/logging/MDCUtil.java |  32 +++++
 .../airavata/common/logging/ServerId.java       |  68 +++++++++
 .../common/logging/kafka/KafkaAppender.java     | 115 ++++++++++++++++
 .../airavata/common/utils/AiravataZKUtils.java  |  13 +-
 .../common/utils/ApplicationSettings.java       |  34 +++--
 .../airavata/common/utils/AwsMetadata.java      | 137 +++++++++++++++++++
 .../airavata/common/utils/ServerSettings.java   |  34 +++++
 .../main/resources/airavata-server.properties   |   8 +-
 modules/distribution/pom.xml                    |  19 +--
 .../gfac/bes/provider/impl/BESProvider.java     |   5 +-
 .../impl/JSDLGeneratorTestWithMyProxyAuth.java  |   2 +-
 .../apache/airavata/gfac/core/GFacUtils.java    |   2 +-
 .../gfac/core/context/ProcessContext.java       |   4 +-
 .../airavata/gfac/core/context/TaskContext.java |   4 +-
 .../gfac/impl/task/BESJobSubmissionTask.java    |  11 +-
 .../gfac/monitor/email/EmailBasedMonitor.java   |  16 +--
 .../services/impl/BigRed2TestWithSSHAuth.java   |   2 +-
 .../impl/GSISSHProviderTestWithMyProxyAuth.java |   2 +-
 .../impl/SSHProviderTestWithSSHAuth.java        |   2 +-
 .../airavata/gfac/server/GfacServerHandler.java |  42 +++---
 .../server/OrchestratorServerHandler.java       |  33 +++--
 modules/registry/registry-core/pom.xml          |  27 ----
 .../org/apache/airavata/server/ServerMain.java  |  38 ++++-
 .../utils/PropertyReader.java                   |   3 +-
 .../core/AbstractThriftDeserializer.java        |   3 +-
 pom.xml                                         |  65 +++++++++
 31 files changed, 873 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/commons/pom.xml
----------------------------------------------------------------------
diff --git a/modules/commons/pom.xml b/modules/commons/pom.xml
index a2136c6..f4231ce 100644
--- a/modules/commons/pom.xml
+++ b/modules/commons/pom.xml
@@ -115,7 +115,6 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.curator</groupId>
@@ -127,12 +126,21 @@
             <artifactId>libthrift</artifactId>
             <version>${thrift.version}</version>
         </dependency>
-
         <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
             <version>${google.gson.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka-clients.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
     </dependencies>
 
     <properties>

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/commons/src/main/java-templates/org/apache/airavata/common/utils/BuildConstant.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java-templates/org/apache/airavata/common/utils/BuildConstant.java b/modules/commons/src/main/java-templates/org/apache/airavata/common/utils/BuildConstant.java
new file mode 100644
index 0000000..8cf5ddf
--- /dev/null
+++ b/modules/commons/src/main/java-templates/org/apache/airavata/common/utils/BuildConstant.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.common.utils;
+
+public class BuildConstant {
+    public static final String VERSION = "${git-describe}";
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/commons/src/main/java/org/apache/airavata/common/logging/Exception.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/Exception.java b/modules/commons/src/main/java/org/apache/airavata/common/logging/Exception.java
new file mode 100644
index 0000000..cea0c95
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/Exception.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.common.logging;
+
+
+public class Exception {
+
+    private String message;
+
+    private String[] stackTrace;
+
+    private String className;
+
+    public Exception(String message, String[] stackTrace) {
+        this.message = message;
+        this.stackTrace = stackTrace;
+    }
+
+    public Exception(String message, String[] stackTrace, String className) {
+        this.message = message;
+        this.stackTrace = stackTrace;
+        this.className = className;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public String[] getStackTrace() {
+        return stackTrace;
+    }
+
+    public void setStackTrace(String[] stackTrace) {
+        this.stackTrace = stackTrace;
+    }
+
+    public String getClassName() {
+        return className;
+    }
+
+    public void setClassName(String className) {
+        this.className = className;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/commons/src/main/java/org/apache/airavata/common/logging/LogEntry.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/LogEntry.java b/modules/commons/src/main/java/org/apache/airavata/common/logging/LogEntry.java
new file mode 100644
index 0000000..72fc4a0
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/LogEntry.java
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.common.logging;
+
+
+import java.lang.*;
+import java.util.Map;
+
+public class LogEntry {
+
+    private ServerId serverId;
+
+    private String message;
+
+    private String timestamp;
+
+    private String level;
+
+    private String loggerName;
+
+    private Map<String, String> mdc;
+
+    private String threadName;
+
+    private Exception exception;
+
+    public LogEntry(ServerId serverId, String message, String timestamp, String level, String loggerName, Map<String,
+            String> mdc, String threadName, Exception exception) {
+        this.serverId = serverId;
+        this.message = message;
+        this.timestamp = timestamp;
+        this.level = level;
+        this.loggerName = loggerName;
+        this.mdc = mdc;
+        this.threadName = threadName;
+        this.exception = exception;
+    }
+
+    public LogEntry(ServerId serverId, String message, String timestamp, String level, String loggerName, Map<String,
+            String> mdc, String threadName) {
+        this.serverId = serverId;
+        this.message = message;
+        this.timestamp = timestamp;
+        this.level = level;
+        this.loggerName = loggerName;
+        this.mdc = mdc;
+        this.threadName = threadName;
+    }
+
+
+    public ServerId getServerId() {
+        return serverId;
+    }
+
+    public void setServerId(ServerId serverId) {
+        this.serverId = serverId;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public String getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(String timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public String getLevel() {
+        return level;
+    }
+
+    public void setLevel(String level) {
+        this.level = level;
+    }
+
+    public String getLoggerName() {
+        return loggerName;
+    }
+
+    public void setLoggerName(String loggerName) {
+        this.loggerName = loggerName;
+    }
+
+    public Map<String, String> getMdc() {
+        return mdc;
+    }
+
+    public void setMdc(Map<String, String> mdc) {
+        this.mdc = mdc;
+    }
+
+    public String getThreadName() {
+        return threadName;
+    }
+
+    public void setThreadName(String threadName) {
+        this.threadName = threadName;
+    }
+
+    public Exception getException() {
+        return exception;
+    }
+
+    public void setException(Exception exception) {
+        this.exception = exception;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCConstants.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCConstants.java b/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCConstants.java
new file mode 100644
index 0000000..487bce0
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCConstants.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.common.logging;
+
+
+public class MDCConstants {
+    public static final String EXPERIMENT_ID = "experiment_id";
+    public static final String GATEWAY_ID = "gateway_id";
+    public static final String EXPERIMENT_NAME = "experiment_name";
+    public static final String PROCESS_ID = "process_id";
+    public static final String TOKEN_ID = "token_id";
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCUtil.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCUtil.java b/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCUtil.java
new file mode 100644
index 0000000..4549d25
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/MDCUtil.java
@@ -0,0 +1,32 @@
+///
+// Copyright (c) 2016. Highfive Technologies, Inc.
+///
+package org.apache.airavata.common.logging;
+import org.slf4j.MDC;
+
+import java.util.Map;
+
+public class MDCUtil {
+    public static Runnable wrapWithMDC(Runnable r) {
+        Map<String, String> mdc = MDC.getCopyOfContextMap();
+        return () -> {
+            Map<String, String> oldMdc = MDC.getCopyOfContextMap();
+
+            if (mdc == null) {
+                MDC.clear();
+            } else {
+                MDC.setContextMap(mdc);
+            }
+            try {
+                r.run();
+            } finally {
+                if (oldMdc == null) {
+                    MDC.clear();
+                } else {
+                    MDC.setContextMap(oldMdc);
+                }
+            }
+
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/commons/src/main/java/org/apache/airavata/common/logging/ServerId.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/ServerId.java b/modules/commons/src/main/java/org/apache/airavata/common/logging/ServerId.java
new file mode 100644
index 0000000..9611302
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/ServerId.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.common.logging;
+
+public class ServerId {
+    private String serverId;
+    private String hostName;
+    private String version;
+
+    private String[] roles; // gfac, orchestrator, apiserver,
+
+    public ServerId(String serverId, String hostName, String version, String[] roles) {
+        this.serverId = serverId;
+        this.hostName = hostName;
+        this.version = version;
+        this.roles = roles;
+    }
+
+    public String getServerId() {
+        return serverId;
+    }
+
+    public void setServerId(String serverId) {
+        this.serverId = serverId;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public void setHostName(String hostName) {
+        this.hostName = hostName;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    public String[] getRoles() {
+        return roles;
+    }
+
+    public void setRoles(String[] roles) {
+        this.roles = roles;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java b/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java
new file mode 100644
index 0000000..0617bcf
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.common.logging.kafka;
+
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.classic.spi.IThrowableProxy;
+import ch.qos.logback.classic.spi.StackTraceElementProxy;
+import ch.qos.logback.core.UnsynchronizedAppenderBase;
+import com.google.gson.Gson;
+import org.apache.airavata.common.logging.Exception;
+import org.apache.airavata.common.logging.LogEntry;
+import org.apache.airavata.common.logging.ServerId;
+import org.apache.airavata.common.utils.AwsMetadata;
+import org.apache.airavata.common.utils.BuildConstant;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Properties;
+
+public class KafkaAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
+    private final static Logger logger = LoggerFactory.getLogger(KafkaAppender.class);
+
+    private final Producer<String, String> producer;
+    private final String kafkaTopic;
+
+    private  ServerId serverId = null;
+
+    public KafkaAppender(String kafkaHost, String kafkaTopicPrefix) {
+        Properties props = new Properties();
+        props.put("bootstrap.servers", kafkaHost);
+        props.put("acks", "0");
+        props.put("retries", 0);
+        props.put("batch.size", 16384);
+        props.put("linger.ms", 10000); // Send the batch every 10 seconds
+        props.put("buffer.memory", 33554432);
+        props.put("producer.type", "async");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        this.kafkaTopic = createKafkaTopic(kafkaTopicPrefix);
+        logger.info("Starting kafka producer: bootstrap-server:{}, topic : {}", kafkaHost, this.kafkaTopic);
+        this.producer = new KafkaProducer<>(props);
+        if(ServerSettings.isRunningOnAws()) {
+            final AwsMetadata awsMetadata = new AwsMetadata();
+            serverId = new ServerId(awsMetadata.getId(), awsMetadata.getHostname(),
+                    BuildConstant.VERSION, ServerSettings.getServerRoles());
+        } else {
+            serverId = new ServerId(ServerSettings.getIp(), ServerSettings.getIp(),
+                    BuildConstant.VERSION, ServerSettings.getServerRoles());
+        }
+    }
+
+    @Override
+    protected void append(ILoggingEvent event) {
+        event.prepareForDeferredProcessing();
+        //todo do more elegant streaming approach to publish logs
+
+        if (!event.getLevel().equals(Level.ALL) &&         // OFF AND ALL are not loggable levels
+                !event.getLevel().equals(Level.OFF)) {
+            final IThrowableProxy throwableProxy = event.getThrowableProxy();
+            final LogEntry entry = throwableProxy != null ?
+                    new LogEntry(serverId, event.getMessage(), Instant.ofEpochMilli(event.getTimeStamp()).toString(),
+                            event.getLevel().toString(), event.getLoggerName(), event.getMDCPropertyMap(),
+                            event.getThreadName() != null ? event.getThreadName() : null,
+                            new Exception(throwableProxy.getMessage(), toStringArray(throwableProxy.getStackTraceElementProxyArray())
+                            , throwableProxy.getClassName()))
+                    : new LogEntry(serverId, event.getMessage(), Instant.ofEpochMilli(event.getTimeStamp()).toString(),
+                    event.getLevel().toString(), event.getLoggerName(), event.getMDCPropertyMap(),
+                    event.getThreadName() != null ? event.getThreadName() : null);
+            producer.send(new ProducerRecord<>(kafkaTopic, new Gson().toJson(entry)));
+        }
+    }
+
+
+    private String[] toStringArray(StackTraceElementProxy[] stackTraceElement) {
+        return Arrays.stream(stackTraceElement).map(StackTraceElementProxy::getSTEAsString).toArray(String[]::new);
+    }
+
+    private String createKafkaTopic(String kafkaTopicPrefix) {
+        final String[] serverRoles = ServerSettings.getServerRoles();
+        if (serverRoles.length >= 4) {
+            return String.format("%s_all_logs", kafkaTopicPrefix);
+        }
+        return String.format("%s_%s_logs", kafkaTopicPrefix, ServerSettings.getServerRoles()[0]);
+    }
+
+    public void close() {
+        producer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/commons/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
index 5faf985..75f91fd 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -119,13 +119,14 @@ public class AiravataZKUtils implements Watcher {
     public static void startEmbeddedZK(ServerCnxnFactory cnxnFactory) {
         if (ServerSettings.isEmbeddedZK()) {
             ServerConfig serverConfig = new ServerConfig();
-            URL resource = AiravataZKUtils.class.getClassLoader().getResource("zoo.cfg");
-            if (resource == null) {
-                logger.error("There is no zoo.cfg file in the classpath... Failed to start Zookeeper Server");
-                System.exit(1);
-            }
+            URL resource = ApplicationSettings.loadFile("zoo.cfg");
             try {
+                if (resource == null) {
+                    logger.error("There is no zoo.cfg file in the classpath... Failed to start Zookeeper Server");
+                    System.exit(1);
+                }
                 serverConfig.parse(resource.getPath());
+
             } catch (QuorumPeerConfig.ConfigException e) {
                 logger.error("Error while starting embedded Zookeeper", e);
                 System.exit(2);
@@ -193,7 +194,5 @@ public class AiravataZKUtils implements Watcher {
         } else {
             return bytesToLong(curatorClient.getData().storingStatIn(exists).forPath(cancelDeliveryTagPath));
         }
-
-
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
index 9ce0786..dc7944f 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ApplicationSettings.java
@@ -25,9 +25,11 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.*;
 import java.util.regex.Matcher;
@@ -35,10 +37,12 @@ import java.util.regex.Pattern;
 
 public class ApplicationSettings {
     public static final String SERVER_PROPERTIES="airavata-server.properties";
-    
-	public static String ADDITIONAL_SETTINGS_FILES = "external.settings";
+    public static final String AIRAVATA_CONFIG_DIR = "airavata.config.dir";
+
+    public static String ADDITIONAL_SETTINGS_FILES = "external.settings";
 
 	protected Properties properties = new Properties();
+
     private Exception propertyLoadException;
 
 
@@ -63,7 +67,6 @@ public class ApplicationSettings {
 	private void loadProperties() {
 		URL url = getPropertyFileURL();
         try {
-        	
             properties.load(url.openStream());
             logger.info("Settings loaded from "+url.toString());
             URL[] externalSettingsFileURLs = getExternalSettingsFileURLs();
@@ -77,7 +80,7 @@ public class ApplicationSettings {
 	}
 
 	protected URL getPropertyFileURL() {
-		return ApplicationSettings.class.getClassLoader().getResource(SERVER_PROPERTIES);
+		return ApplicationSettings.loadFile(SERVER_PROPERTIES);
 	}
 	
 	protected URL[] getExternalSettingsFileURLs(){
@@ -86,7 +89,7 @@ public class ApplicationSettings {
 			String externalSettingsFileNames = getSettingImpl(ADDITIONAL_SETTINGS_FILES);
 			String[] externalSettingFiles = externalSettingsFileNames.split(",");
 			for (String externalSettingFile : externalSettingFiles) {
-				URL externalSettingFileURL = ApplicationSettings.class.getClassLoader().getResource(externalSettingFile);
+				URL externalSettingFileURL = ApplicationSettings.loadFile(externalSettingFile);
 				if (externalSettingFileURL==null){
 					logger.warn("Could not file external settings file "+externalSettingFile);
 				}else{
@@ -313,11 +316,11 @@ public class ApplicationSettings {
      * Static methods which will be used by the users
      */
     
-    public static String getSetting(String key) throws ApplicationSettingsException{
+    public static String getSetting(String key) throws ApplicationSettingsException {
     	return getInstance().getSettingImpl(key);
     }
-    
-    public static String getSetting(String key, String defaultValue){
+
+    public static String getSetting(String key, String defaultValue) {
     	return getInstance().getSettingImpl(key,defaultValue);
 
     }
@@ -426,4 +429,19 @@ public class ApplicationSettings {
     public static ShutdownStrategy getShutdownStrategy() throws Exception{
     	return getInstance().getShutdownStrategyImpl();
     }
+
+    public static URL loadFile(String fileName) {
+        final URL resource = ApplicationSettings.class.getClassLoader().getResource(fileName);
+        if(resource == null) {
+            if(System.getProperty(AIRAVATA_CONFIG_DIR) != null) {
+                final String airavataConfigDir = System.getProperty(AIRAVATA_CONFIG_DIR);
+                try {
+                     return new File(airavataConfigDir + File.separator + fileName).toURI().toURL();
+                } catch (MalformedURLException e) {
+                    logger.error("Error parsing the file from airavata.config.dir", airavataConfigDir);
+                }
+            }
+        }
+        return resource;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/commons/src/main/java/org/apache/airavata/common/utils/AwsMetadata.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/AwsMetadata.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/AwsMetadata.java
new file mode 100644
index 0000000..f9b4a65
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/AwsMetadata.java
@@ -0,0 +1,137 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.common.utils;
+
+
+import com.google.common.base.Preconditions;
+import com.google.common.net.InetAddresses;
+import com.google.gson.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+public class AwsMetadata {
+    private static final Logger log = LoggerFactory.getLogger(AwsMetadata.class);
+    private static final String METADATA_URI_BASE = "http://169.254.169.254/";
+    private static final String REGION_SUFFIX = "/latest/dynamic/instance-identity/document";
+    private static final String ZONE_SUFFIX = "/latest/meta-data/placement/availability-zone";
+    private static final String PUBLIC_IPV4_SUFFIX = "/latest/meta-data/public-ipv4";
+    private static final String PRIVATE_IPV4_SUFFIX = "latest/meta-data/local-ipv4";
+    private static final String HOSTNAME_SUFFIX = "/latest/meta-data/hostname";
+    private static final String ID_SUFFIX = "/latest/meta-data/instance-id";
+
+    private final URI baseUri;
+
+    private String id;
+    private String region;
+    private String hostname;
+    private String zone;
+    private InetAddress publicIp;
+    private InetAddress privateIp;
+
+    public AwsMetadata() {
+        try {
+            baseUri = new URI(METADATA_URI_BASE);
+        } catch (URISyntaxException e) {
+            Preconditions.checkState(false, "Unexpected URI Syntax Exception: {}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getRegion() {
+        if (region == null) {
+            try {
+                String dynamicData = getMetadataAt(REGION_SUFFIX);
+                if (dynamicData != null) {
+                    final JsonObject asJsonObject = new JsonParser().parse(dynamicData).getAsJsonObject();
+                    region = asJsonObject.get("region").getAsString();
+                }
+            } catch (ClassCastException e) {
+                log.error("Unable to get region, expecting a JSON Object", e);
+            }
+        }
+        return region;
+    }
+
+    public String getZoneName() {
+        if (zone == null) {
+            zone = getMetadataAt(ZONE_SUFFIX);
+        }
+        return zone;
+    }
+
+    public String getId() {
+        if (id == null) {
+            id = getMetadataAt(ID_SUFFIX);
+        }
+
+        return id;
+    }
+
+    public String getHostname() {
+        if (hostname == null) {
+            hostname = getMetadataAt(HOSTNAME_SUFFIX);
+        }
+        return hostname;
+    }
+
+    public InetAddress getPublicIpAddress() {
+        if (publicIp == null) {
+            String ip = getMetadataAt(PUBLIC_IPV4_SUFFIX);
+            if (ip != null) {
+                publicIp = InetAddresses.forString(ip);
+            }
+        }
+        return publicIp;
+    }
+
+    public InetAddress getInternalIpAddress() {
+        if (privateIp == null) {
+            String ip = getMetadataAt(PRIVATE_IPV4_SUFFIX);
+            if (ip != null) {
+                privateIp = InetAddresses.forString(ip);
+            }
+        }
+        return privateIp;
+    }
+
+    private String getMetadataAt(String suffix) {
+        try {
+            URI resolved = baseUri.resolve(suffix);
+            StringBuilder builder = new StringBuilder();
+            String line = null;
+            try (BufferedReader reader = new BufferedReader(new InputStreamReader(resolved.toURL().openStream()))) {
+                while ((line = reader.readLine()) != null) {
+                    builder.append(line);
+                }
+                return builder.toString();
+            }
+        } catch (Exception e) {
+            // ignore for now to make sure local servers don't go verbose
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index bb11264..3ac2a6e 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
 
 public class ServerSettings extends ApplicationSettings {
 
@@ -108,6 +110,15 @@ public class ServerSettings extends ApplicationSettings {
     private static final String EMAIL_BASED_MONITOR_STORE_PROTOCOL = "email.based.monitor.store.protocol";
     private static final String ENABLE_EMAIL_BASED_MONITORING = "enable.email.based.monitoring";
 
+    private static final String IS_RUNNING_ON_AWS = "isRunningOnAws";
+    private static final String ENABLE_KAFKA_LOGGING = "enable.kafka.logging";
+    private static final String KAFKA_BROKER_LIST = "kafka.broker.list";
+    private static final String KAFKA_TOPIC_PREFIX = "kafka.topic.prefix";
+    private static final String SERVER_ROLES = "server.roles";
+
+    // todo until AIRAVATA-2066 is finished, keep server side list configurations here.
+    private static Map<String, String[]> listConfigurations = new HashMap<>();
+
     private static boolean stopAllThreads = false;
     private static boolean emailBaseNotificationEnable;
     private static String outputLocation;
@@ -405,4 +416,27 @@ public class ServerSettings extends ApplicationSettings {
     public static Boolean isEnableSharing() throws ApplicationSettingsException {
         return Boolean.parseBoolean(getSetting(ENABLE_SHARING));
     }
+    public static boolean isRunningOnAws() {
+        return Boolean.valueOf(getSetting(IS_RUNNING_ON_AWS, "false"));
+    }
+
+    public static String getKafkaBrokerList() {
+        return getSetting(KAFKA_BROKER_LIST, null);
+    }
+
+    public static String getKafkaTopicPrefix() {
+        return getSetting(KAFKA_TOPIC_PREFIX, "all");
+    }
+
+    public static boolean isEnabledKafkaLogging() {
+        return Boolean.valueOf(getSetting(ENABLE_KAFKA_LOGGING, "false"));
+    }
+
+    public static void setServerRoles(String[] roles) {
+        listConfigurations.put(SERVER_ROLES, roles);
+    }
+
+    public static String[] getServerRoles() {
+        return listConfigurations.get(SERVER_ROLES);
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 30a6338..aa84353 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -285,4 +285,10 @@ authorization.policy=airavata-default-xacml-policy
 #### authorization cache related configuration ####
 authz.cache.enabled=true
 authz.cache.manager.class=org.apache.airavata.api.server.security.authzcache.DefaultAuthzCacheManager
-in.memory.cache.size=1000
\ No newline at end of file
+in.memory.cache.size=1000
+
+# Kafka Logging related configuration
+isRunningOnAws=false
+kafka.broker.list=localhost:9092
+kafka.topic.prefix=local
+enable.kafka.logging=false

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/pom.xml b/modules/distribution/pom.xml
index 78d9bb9..d096739 100644
--- a/modules/distribution/pom.xml
+++ b/modules/distribution/pom.xml
@@ -164,14 +164,6 @@
             <artifactId>jcl-over-slf4j</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-        </dependency>
-        <dependency>
             <groupId>com.amazonaws</groupId>
             <artifactId>aws-java-sdk</artifactId>
             <version>1.9.0</version>
@@ -559,7 +551,16 @@
             <artifactId>curator-framework</artifactId>
             <version>${curator.version}</version>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka-clients.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
         <!-- ======================== Sample =================== -->
         <dependency>
             <groupId>org.apache.airavata</groupId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index b287b8e..b8d06df 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -144,10 +144,9 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
             JobDetails jobDetails = new JobDetails();
             FactoryClient factory = new FactoryClient(eprt, secProperties);
 
-            log.info(String.format("Activity Submitting to %s ... \n",
-                    factoryUrl));
+            log.info(String.format("Activity Submitting to %s ... \n", factoryUrl));
             CreateActivityResponseDocument response = factory.createActivity(cad);
-            log.info(String.format("Activity Submitted to %s \n", factoryUrl));
+            log.info("Activity Submitted to {} \n", factoryUrl);
 
             EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/gfac/gfac-bes/src/test/java/org/apache/airavata/core/gfac/services/impl/JSDLGeneratorTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/test/java/org/apache/airavata/core/gfac/services/impl/JSDLGeneratorTestWithMyProxyAuth.java b/modules/gfac/gfac-bes/src/test/java/org/apache/airavata/core/gfac/services/impl/JSDLGeneratorTestWithMyProxyAuth.java
index cf9b82a..90f67a1 100644
--- a/modules/gfac/gfac-bes/src/test/java/org/apache/airavata/core/gfac/services/impl/JSDLGeneratorTestWithMyProxyAuth.java
+++ b/modules/gfac/gfac-bes/src/test/java/org/apache/airavata/core/gfac/services/impl/JSDLGeneratorTestWithMyProxyAuth.java
@@ -106,7 +106,7 @@ import static org.junit.Assert.assertTrue;
 //	}
 //
 //	protected GFacConfiguration getGFACConfig() throws Exception{
-//        URL resource = BESProviderTestWithMyProxyAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+//        URL resource = ApplicationSettings.loadFile(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
 //        System.out.println(resource.getFile());
 //        GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()),null,null);
 //		return gFacConfiguration;

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index cc1f17b..7e2154e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -809,7 +809,7 @@ public class GFacUtils {
     public static File createJobFile(TaskContext taskContext, JobDescriptor jobDescriptor, JobManagerConfiguration jobManagerConfiguration) throws GFacException {
         try {
             TransformerFactory factory = TransformerFactory.newInstance();
-            URL resource = GFacUtils.class.getClassLoader().getResource(jobManagerConfiguration.getJobDescriptionTemplateName());
+            URL resource = ApplicationSettings.loadFile(jobManagerConfiguration.getJobDescriptionTemplateName());
 
             if (resource == null) {
                 String error = "System configuration file '" + jobManagerConfiguration.getJobDescriptionTemplateName()

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 8a2cc4e..4b40159 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -370,8 +370,8 @@ public class ProcessContext {
 
 	public void setProcessStatus(ProcessStatus status) {
 		if (status != null) {
-			log.info("expId: {}, processId: {} :- Process status changed {} -> {}", getExperimentId(), processId,
-					getProcessState().name(), status.getState().name());
+			log.info(String.format("expId: %s, processId: %s :- Process status changed %s -> %s", getExperimentId(), processId,
+					getProcessState().name(), status.getState().name()));
 			List<ProcessStatus> processStatuses = new ArrayList<>();
 			processStatuses.add(status);
 			processModel.setProcessStatuses(processStatuses);

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
index 6f95d3d..74e8b65 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
@@ -64,9 +64,9 @@ public class TaskContext {
 	}
 
 	public void setTaskStatus(TaskStatus taskStatus) {
-		log.info("expId: {}, processId: {}, taskId: {}, type: {}:- Task status changed {} -> {}", parentProcessContext
+		log.info(String.format("expId: %s, processId: %s, taskId: %s, type: %s : Task status changed %s -> %s", parentProcessContext
 				.getExperimentId(), parentProcessContext.getProcessId(), getTaskId(), getTaskType().name(),
-				getTaskState().name(), taskStatus .getState().name());
+				getTaskState().name(), taskStatus .getState().name()));
 		List<TaskStatus> taskStatuses = new ArrayList<>();
 		taskStatuses.add(taskStatus);
 		taskModel.setTaskStatuses(taskStatuses);

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
index a4dcb5d..e4757e8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
@@ -151,8 +151,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
             jobDetails.setProcessId(taskContext.getProcessId());
             FactoryClient factory = new FactoryClient(eprt, secProperties);
 
-            log.info(String.format("Activity Submitting to %s ... \n",
-                    factoryUrl));
+            log.info(String.format("Activity Submitting to %s ... \n", factoryUrl));
             CreateActivityResponseDocument response = factory.createActivity(cad);
             log.info(String.format("Activity Submitted to %s \n", factoryUrl));
 
@@ -220,8 +219,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
                 JobState applicationJobStatus = JobState.COMPLETE;
                 jobDetails.setJobStatuses(Arrays.asList(new JobStatus(applicationJobStatus)));
                 GFacUtils.saveJobStatus(processContext, jobDetails);
-                log.info("Job Id: {}, exit code: {}, exit status: {}", jobDetails.getJobId(),
-                        activityStatus.getExitCode(), ActivityStateEnumeration.FINISHED.toString());
+                log.info(String.format("Job Id: %s, exit code: %s, exit status: %s", jobDetails.getJobId(),
+                        activityStatus.getExitCode(), ActivityStateEnumeration.FINISHED.toString()));
 
 //                if (activityStatus.getExitCode() == 0) {
 //                } else {
@@ -266,7 +265,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
                         fileName = localFilePath.substring(localFilePath.lastIndexOf("/") + 1);
                         URI destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
                         remoteFilePath = destinationURI.getPath();
-                        log.info("SCP local file :{} -> from remote :{}", localFilePath, remoteFilePath);
+                        log.info(String.format("SCP local file :%s -> from remote :%s", localFilePath, remoteFilePath));
                         SSHUtils.scpTo(localFilePath, remoteFilePath, sshSession);
                         output.setValue(destinationURI.toString());
                         break;
@@ -313,7 +312,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
                     remoteFilePath = remoteFileURI.getPath();
                     fileName = remoteFilePath.substring(remoteFilePath.lastIndexOf("/") + 1);
                     localFilePath = pc.getInputDir() + File.separator + fileName;
-                    log.info("SCP remote file :{} -> to local :{}", remoteFilePath, localFilePath);
+                    log.info(String.format("SCP remote file :%s -> to local :%s", remoteFilePath, localFilePath));
                     SSHUtils.scpFrom(remoteFilePath, localFilePath, sshSession);
                     input.setValue("file:/" + localFilePath);
                 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index b24aa75..bf2bdbb 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -91,7 +91,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         properties = new Properties();
         properties.put("mail.store.protocol", storeProtocol);
         timer = new Timer("CancelJobHandler", true);
-        long period = 1000*60*5; // five minute delay between successive task executions.
+        long period = 1000 * 60 * 5; // five minute delay between successive task executions.
         timer.schedule(new CancelTimerTask(), 0 , period);
     }
 
@@ -117,7 +117,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 	}
 	@Override
 	public void monitor(String jobId, TaskContext taskContext) {
-		log.info("[EJM]: Added monitor Id : " + jobId + " to email based monitor map");
+		log.info(String.format("[EJM]: Added monitor Id : %s to email based monitor map", jobId));
 		jobMonitorMap.put(jobId, taskContext);
         taskContext.getParentProcessContext().setPauseTaskExecution(true);
 	}
@@ -203,7 +203,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 					    continue;
 				    } else {
                         quite = false;
-					    log.info("[EJM]: " + jobMonitorMap.size() + " job/s in job monitor map");
+					    log.info(String.format("[EJM]: %d job/s in job monitor map", jobMonitorMap.size()));
 				    }
 				    if (!store.isConnected()) {
 					    store.connect();
@@ -267,13 +267,13 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
                 if (jobStatusResult.getJobId() != null) {
                     taskContext = jobMonitorMap.get(jobStatusResult.getJobId());
                 } else {
-                    log.info("Returned null for job id, message subject--> {}" , message.getSubject());
+                    log.info(String.format("Returned null for job id, message subject--> %s" , message.getSubject()));
                 }
                 if (taskContext == null) {
                     if (jobStatusResult.getJobName() != null) {
                         taskContext = jobMonitorMap.get(jobStatusResult.getJobName());
                     } else {
-                        log.info("Returned null for job name, message subject --> {}" , message.getSubject());
+                        log.info(String.format("Returned null for job name, message subject --> %s" , message.getSubject()));
                     }
                 }
                 if (taskContext != null) {
@@ -378,9 +378,9 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 			    log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
 			    GFacUtils.saveJobStatus(parentProcessContext, jobModel);
 		    } catch (GFacException e) {
-			    log.error("expId: {}, processId: {}, taskId: {}, jobId: {} :- Error while save and publishing Job " +
-                        "status {}", taskContext.getExperimentId(), taskContext.getProcessId(), jobModel
-                        .getTaskId(), jobModel.getJobId(), jobStatus.getJobState());
+			    log.error(String.format("expId: %s, processId: %s, taskId: %s, jobId: %s :- Error while save and publishing Job " +
+                        "status %s", taskContext.getExperimentId(), taskContext.getProcessId(), jobModel
+                        .getTaskId(), jobModel.getJobId(), jobStatus.getJobState()));
 		    }
 	    }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
index 4aecd57..fbad51c 100644
--- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
+++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
@@ -91,7 +91,7 @@
 //            System.out.println("Use -Dssh.username=xxx -Dssh.password=yyy -Dssh.keypass=zzz " +
 //                    "-Dprivate.ssh.key -Dpublic.ssh.key -Dssh.working.directory ");
 //        }
-//        URL resource = BigRed2TestWithSSHAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+//        URL resource = ApplicationSettings.loadFile(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
 //        assert resource != null;
 //        System.out.println(resource.getFile());
 //        GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), null);

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java
index 4ca5684..9d352a4 100644
--- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java
+++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java
@@ -90,7 +90,7 @@
 //                    "E.g :- mvn clean install -Dmyproxy.username=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<");
 //            throw new Exception("Need my proxy user name password to run tests.");
 //        }
-//        URL resource = GSISSHProviderTestWithMyProxyAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+//        URL resource = ApplicationSettings.loadFile(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
 //        assert resource != null;
 //        System.out.println(resource.getFile());
 //        GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), null);

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java
index 4aa0df1..bdbadda 100644
--- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java
+++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java
@@ -54,7 +54,7 @@
 //    @Before
 //    public void setUp() throws Exception {
 //
-//    	URL resource = SSHProviderTestWithSSHAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+//    	URL resource = ApplicationSettings.loadFile(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
 //        GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()),null);
 ////        gFacConfiguration.s
 //        //have to set InFlwo Handlers and outFlowHandlers

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index cd484bb..7394e62 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -23,6 +23,8 @@ package org.apache.airavata.gfac.server;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.AiravataStartupException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.logging.MDCConstants;
+import org.apache.airavata.common.logging.MDCUtil;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
@@ -60,10 +62,12 @@ import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -152,13 +156,11 @@ public class GfacServerHandler implements GfacService.Iface {
      */
     public boolean submitProcess(String processId, String gatewayId, String tokenId) throws
             TException {
-        requestCount++;
-        log.info("-----------------------------------" + requestCount + "-----------------------------------------");
-        log.info(processId, "GFac Received submit job request for the Process: {} process: {}", processId,
-                processId);
-
+        MDC.put(MDCConstants.PROCESS_ID, processId);
+        MDC.put(MDCConstants.GATEWAY_ID, gatewayId);
+        MDC.put(MDCConstants.TOKEN_ID, tokenId);
         try {
-	        executorService.execute(new GFacWorker(processId, gatewayId, tokenId));
+	        executorService.execute(MDCUtil.wrapWithMDC(new GFacWorker(processId, gatewayId, tokenId)));
         } catch (GFacException e) {
             log.error("Failed to submit process", e);
 
@@ -184,28 +186,29 @@ public class GfacServerHandler implements GfacService.Iface {
         }
 
 
-        public void onMessage(MessageContext message) {
-            log.info(" Message Received with message id '" + message.getMessageId()
-		            + "' and with message type '" + message.getType());
-            if (message.getType().equals(MessageType.LAUNCHPROCESS)) {
+        public void onMessage(MessageContext messageContext) {
+            MDC.put(MDCConstants.GATEWAY_ID, messageContext.getGatewayId());
+            log.info(" Message Received with message id '" + messageContext.getMessageId()
+		            + "' and with message type '" + messageContext.getType());
+            if (messageContext.getType().equals(MessageType.LAUNCHPROCESS)) {
 	            ProcessStatus status = new ProcessStatus();
 	            status.setState(ProcessState.STARTED);
                 try {
                     ProcessSubmitEvent event = new ProcessSubmitEvent();
-                    TBase messageEvent = message.getEvent();
+                    TBase messageEvent = messageContext.getEvent();
                     byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
-	                if (message.isRedeliver()) {
+	                if (messageContext.isRedeliver()) {
 		                // check the process is already active in this instance.
 		                if (Factory.getGfacContext().getProcess(event.getProcessId()) != null) {
 			                // update deliver tag
 			                try {
-				                updateDeliveryTag(curatorClient, gfacServerName, event, message );
+				                updateDeliveryTag(curatorClient, gfacServerName, event, messageContext );
 				                return;
 			                } catch (Exception e) {
 				                log.error("Error while updating delivery tag for redelivery message , messageId : " +
-						                message.getMessageId(), e);
-				                processLaunchSubscriber.sendAck(message.getDeliveryTag());
+						                messageContext.getMessageId(), e);
+				                processLaunchSubscriber.sendAck(messageContext.getDeliveryTag());
 			                }
 		                } else {
 			                // read process status from registry
@@ -220,8 +223,9 @@ public class GfacServerHandler implements GfacService.Iface {
 	                Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event
 			                .getProcessId());
 	                publishProcessStatus(event, status);
+                    MDC.put(MDCConstants.EXPERIMENT_ID, event.getExperimentId());
                     try {
-                        createProcessZKNode(curatorClient, gfacServerName, event, message);
+                        createProcessZKNode(curatorClient, gfacServerName, event, messageContext);
                         boolean isCancel = setCancelWatcher(curatorClient, event.getExperimentId(), event.getProcessId());
                         if (isCancel) {
                             if (status.getState() == ProcessState.STARTED) {
@@ -237,7 +241,7 @@ public class GfacServerHandler implements GfacService.Iface {
                                 status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                                 Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
                                 publishProcessStatus(event, status);
-                                processLaunchSubscriber.sendAck(message.getDeliveryTag());
+                                processLaunchSubscriber.sendAck(messageContext.getDeliveryTag());
                                 return;
                             } else {
                                 setCancelData(event.getExperimentId(),event.getProcessId());
@@ -246,7 +250,7 @@ public class GfacServerHandler implements GfacService.Iface {
                         submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
                     } catch (Exception e) {
                         log.error(e.getMessage(), e);
-                        processLaunchSubscriber.sendAck(message.getDeliveryTag());
+                        processLaunchSubscriber.sendAck(messageContext.getDeliveryTag());
                     }
                 } catch (TException e) {
                     log.error(e.getMessage(), e); //nobody is listening so nothing to throw
@@ -254,6 +258,8 @@ public class GfacServerHandler implements GfacService.Iface {
                     log.error("Error while updating experiment status", e);
                 } catch (AiravataException e) {
 	                log.error("Error while publishing process status", e);
+                } finally {
+                    MDC.clear();
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 725a0b1..5eeeb30 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -23,6 +23,8 @@ package org.apache.airavata.orchestrator.server;
 
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.logging.MDCConstants;
+import org.apache.airavata.common.logging.MDCUtil;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
@@ -67,6 +69,7 @@ import org.apache.thrift.TException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 import java.util.*;
 
@@ -139,7 +142,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 			ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode);
             experiment = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
             if (experiment == null) {
-                log.error(experimentId, "Error retrieving the Experiment by the given experimentID: {} ", experimentId);
+                log.error("Experiment ID should not be NULL");
                 return false;
             }
 
@@ -225,7 +228,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
                 status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                 OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, publisher, gatewayId);
                 log.info("expId: {}, Launched experiment ", experimentId);
-                OrchestratorServerThreadPoolExecutor.getCachedThreadPool().execute(new SingleAppExperimentRunner(experimentId, token, gatewayId));
+                OrchestratorServerThreadPoolExecutor.getCachedThreadPool().execute(MDCUtil.wrapWithMDC(new SingleAppExperimentRunner(experimentId, token, gatewayId)));
             } else if (executionType == ExperimentType.WORKFLOW) {
                 //its a workflow execution experiment
                 log.debug(experimentId, "Launching workflow experiment {}.", experimentId);
@@ -591,7 +594,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 
 		@Override
 		public void onMessage(MessageContext messageContext) {
-
+			MDC.put(MDCConstants.GATEWAY_ID, messageContext.getGatewayId());
 			switch (messageContext.getType()) {
 				case EXPERIMENT:
 					launchExperiment(messageContext);
@@ -604,6 +607,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 					log.error("Orchestrator got un-support message type : " + messageContext.getType());
 					break;
 			}
+			MDC.clear();
 		}
 
 		private void cancelExperiment(MessageContext messageContext) {
@@ -611,6 +615,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 				byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent());
 				ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent();
 				ThriftUtils.createThriftFromBytes(bytes, expEvent);
+				log.info(String.format("Cancelling experiment with experimentId: %s gateway Id: %s", expEvent.getExperimentId(), expEvent.getGatewayId()));
 				terminateExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
 			} catch (TException e) {
 				log.error("Experiment cancellation failed due to Thrift conversion error", e);
@@ -622,13 +627,16 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	}
 
 	private void launchExperiment(MessageContext messageContext) {
+		ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent();
 		try {
-            byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent());
-            ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent();
-            ThriftUtils.createThriftFromBytes(bytes, expEvent);
-            if (messageContext.isRedeliver()) {
+			byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent());
+			ThriftUtils.createThriftFromBytes(bytes, expEvent);
+			MDC.put(MDCConstants.EXPERIMENT_ID, expEvent.getExperimentId());
+			log.info(String.format("Launching experiment with experimentId: %s gateway Id: %s", expEvent.getExperimentId(), expEvent.getGatewayId()));
+			if (messageContext.isRedeliver()) {
 				ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.
 						get(ExperimentCatalogModelType.EXPERIMENT, expEvent.getExperimentId());
+				MDC.put(MDCConstants.EXPERIMENT_NAME, experimentModel.getExperimentName());
 				if (experimentModel.getExperimentStatus().get(0).getState() == ExperimentState.CREATED) {
 					launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
 				}
@@ -636,11 +644,18 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
                 launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
             }
 		} catch (TException e) {
-            log.error("Experiment launch failed due to Thrift conversion error", e);
+			String logMessage =  expEvent.getExperimentId() != null && expEvent.getGatewayId() != null ?
+					String.format("Experiment launch failed due to Thrift conversion error, experimentId: %s, gatewayId: %s",
+					expEvent.getExperimentId(), expEvent.getGatewayId()): "Experiment launch failed due to Thrift conversion error";
+            log.error(logMessage,  e);
 		} catch (RegistryException e) {
-			log.error("Experiment launch failed due to registry access issue", e);
+			String logMessage =  expEvent.getExperimentId() != null && expEvent.getGatewayId() != null ?
+					String.format("Experiment launch failed due to registry access issue, experimentId: %s, gatewayId: %s",
+					expEvent.getExperimentId(), expEvent.getGatewayId()): "Experiment launch failed due to registry access issue";
+			log.error(logMessage, e);
 		}finally {
 			experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+			MDC.clear();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/registry/registry-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/registry/registry-core/pom.xml b/modules/registry/registry-core/pom.xml
index 1bea32f..c2b26b2 100644
--- a/modules/registry/registry-core/pom.xml
+++ b/modules/registry/registry-core/pom.xml
@@ -102,33 +102,6 @@
 
     <build>
         <plugins>
-            <plugin>
-                <groupId>org.apache.openjpa</groupId>
-                <artifactId>openjpa-maven-plugin</artifactId>
-                <version>2.2.0</version>
-                <configuration>
-                    <includes>**/entities/*.class</includes>
-                    <excludes>**/entities/XML*.class</excludes>
-                    <addDefaultConstructor>true</addDefaultConstructor>
-                    <enforcePropertyRestrictions>true</enforcePropertyRestrictions>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>enhancer</id>
-                        <phase>process-classes</phase>
-                        <goals>
-                            <goal>enhance</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <dependencies>
-                    <dependency>
-                        <groupId>org.apache.openjpa</groupId>
-                        <artifactId>openjpa</artifactId>
-                        <version>2.2.0</version>
-                    </dependency>
-                </dependencies>
-            </plugin>
             <!--<plugin>-->
                 <!--<groupId>org.apache.maven.plugins</groupId>-->
                 <!--<artifactId>maven-antrun-plugin</artifactId>-->

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
----------------------------------------------------------------------
diff --git a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
index 99387de..1c0483d 100644
--- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
+++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
@@ -20,13 +20,18 @@
  */
 package org.apache.airavata.server;
 
+import ch.qos.logback.classic.LoggerContext;
+import org.apache.airavata.api.Airavata;
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.logging.kafka.KafkaAppender;
 import org.apache.airavata.common.utils.*;
 import org.apache.airavata.common.utils.ApplicationSettings.ShutdownStrategy;
 import org.apache.airavata.common.utils.IServer.ServerStatus;
 import org.apache.airavata.common.utils.StringUtil.CommandLineParameters;
 import org.apache.commons.cli.ParseException;
 import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.slf4j.ILoggerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -154,15 +159,41 @@ public class ServerMain {
 //		});
 //	}
 	
-	public static void main(String args[]) throws ParseException, IOException {
-        CommandLineParameters commandLineParameters = StringUtil.getCommandLineParser(args);
+	public static void main(String args[]) throws ParseException, IOException, AiravataException {
+		ServerSettings.mergeSettingsCommandLineArgs(args);
+		ServerSettings.setServerRoles(ApplicationSettings.getSetting(SERVERS_KEY, "all").split(","));
+
+		if (ServerSettings.isEnabledKafkaLogging()) {
+			final ILoggerFactory iLoggerFactory = LoggerFactory.getILoggerFactory();
+			if (iLoggerFactory instanceof LoggerContext) {
+				final KafkaAppender kafkaAppender = new KafkaAppender(ServerSettings.getKafkaBrokerList(),
+						ServerSettings.getKafkaTopicPrefix());
+				kafkaAppender.setContext((LoggerContext) iLoggerFactory);
+				kafkaAppender.setName("kafka-appender");
+				kafkaAppender.clearAllFilters();
+				kafkaAppender.start();
+				// Until AIRAVATA-2073 filter org.apache.kafka logs
+				((LoggerContext) iLoggerFactory).getLogger("org.apache.airavata").addAppender(kafkaAppender);
+				((LoggerContext) iLoggerFactory).getLogger("org.apache.zookeeper").addAppender(kafkaAppender);
+				((LoggerContext) iLoggerFactory).getLogger("org.apache.derby").addAppender(kafkaAppender);
+				((LoggerContext) iLoggerFactory).getLogger("org.apache.commons").addAppender(kafkaAppender);
+				((LoggerContext) iLoggerFactory).getLogger("org.apache.thrift").addAppender(kafkaAppender);
+				((LoggerContext) iLoggerFactory).getLogger("com").addAppender(kafkaAppender);
+			} else {
+				logger.warn("Kafka logging is enabled but cannot find logback LoggerContext, found", iLoggerFactory.getClass().toString());
+				throw new AiravataException("Kafka logging is enabled but cannot find logback LoggerContext");
+			}
+		} else {
+			logger.info("Kafka logging is disabled in airavata server configurations");
+		}
+
+		CommandLineParameters commandLineParameters = StringUtil.getCommandLineParser(args);
         if (commandLineParameters.getArguments().contains(STOP_COMMAND_STR)){
             performServerStopRequest(commandLineParameters);
         }else{
             AiravataZKUtils.startEmbeddedZK(cnxnFactory);
             performServerStart(args);
 		}
-
     }
 
 
@@ -173,7 +204,6 @@ public class ServerMain {
 		for (String string : args) {
 			logger.info("Server Arguments: " + string);
 		}
-		ServerSettings.mergeSettingsCommandLineArgs(args);
 		String serverNames;
 		try {
 			serverNames = ApplicationSettings.getSetting(SERVERS_KEY);

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/PropertyReader.java
----------------------------------------------------------------------
diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/PropertyReader.java b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/PropertyReader.java
index f7b8e1f..9df1745 100644
--- a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/PropertyReader.java
+++ b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/utils/PropertyReader.java
@@ -21,6 +21,7 @@
 
 package org.apache.airavata.testsuite.multitenantedairavata.utils;
 
+import org.apache.airavata.common.utils.ApplicationSettings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +42,7 @@ public class PropertyReader {
     }
 
     protected void loadProperties() throws IOException {
-        URL airavataURL = PropertyFileType.class.getClassLoader().getResource(TestFrameworkConstants.AIRAVATA_CLIENT_PROPERTIES);
+        URL airavataURL = ApplicationSettings.loadFile(TestFrameworkConstants.AIRAVATA_CLIENT_PROPERTIES);
         if (airavataURL != null){
             airavataClientProperties.load(airavataURL.openStream());
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/modules/user-profile/user-profile-core/src/main/java/org/apache/airavata/userprofile/core/AbstractThriftDeserializer.java
----------------------------------------------------------------------
diff --git a/modules/user-profile/user-profile-core/src/main/java/org/apache/airavata/userprofile/core/AbstractThriftDeserializer.java b/modules/user-profile/user-profile-core/src/main/java/org/apache/airavata/userprofile/core/AbstractThriftDeserializer.java
index 83af52b..d4b1863 100644
--- a/modules/user-profile/user-profile-core/src/main/java/org/apache/airavata/userprofile/core/AbstractThriftDeserializer.java
+++ b/modules/user-profile/user-profile-core/src/main/java/org/apache/airavata/userprofile/core/AbstractThriftDeserializer.java
@@ -91,8 +91,7 @@ public abstract class AbstractThriftDeserializer<E extends TFieldIdEnum, T exten
             // Validate that the instance contains all required fields.
             validate(instance);
         } catch (final TException e) {
-            log.error(String.format("Unable to deserialize JSON '%s' to type '%s'.",
-                    jp.getValueAsString(), instance.getClass().getName(), e));
+            log.error(String.format("Unable to deserialize JSON %s to type %s.", jp.getValueAsString(), instance.getClass().getName()), e);
             ctxt.mappingException(e.getMessage());
         }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/342400cf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3988fd0..a65df09 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,6 +101,8 @@
         <snakeyaml.version>1.15</snakeyaml.version>
 		<maven.javadoc.failOnError>false</maven.javadoc.failOnError>
 		<maven.replacer.plugin.version>1.5.3</maven.replacer.plugin.version>
+		<kafka-clients.version>0.8.2.2</kafka-clients.version>
+		<logback.version>1.1.6</logback.version>
 	</properties>
 
 	<developers>
@@ -434,6 +436,16 @@
 				<artifactId>curator-framework</artifactId>
 				<version>${curator.version}</version>
 			</dependency>
+			<dependency>
+				<groupId>org.apache.kafka</groupId>
+				<artifactId>kafka-clients</artifactId>
+				<version>${kafka-clients.version}</version>
+			</dependency>
+			<dependency>
+				<groupId>ch.qos.logback</groupId>
+				<artifactId>logback-classic</artifactId>
+				<version>${logback.version}</version>
+			</dependency>
 		</dependencies>
 	</dependencyManagement>
 
@@ -480,6 +492,39 @@
 			<build>
 				<plugins>
 					<plugin>
+						<groupId>org.codehaus.mojo</groupId>
+						<artifactId>templating-maven-plugin</artifactId>
+						<version>1.0.0</version>
+						<executions>
+							<execution>
+								<id>filtering-java-templates</id>
+								<goals>
+									<goal>filter-sources</goal>
+								</goals>
+							</execution>
+						</executions>
+					</plugin>
+					<plugin>
+						<groupId>com.lukegb.mojo</groupId>
+						<artifactId>gitdescribe-maven-plugin</artifactId>
+						<version>3.0</version>
+						<executions>
+							<execution>
+								<goals>
+									<goal>gitdescribe</goal>
+								</goals>
+								<id>git-describe</id>
+								<phase>initialize</phase>
+								<configuration>
+									<extraArguments>
+										<param>--tags</param>
+									</extraArguments>
+									<descriptionProperty>git-describe</descriptionProperty>
+								</configuration>
+							</execution>
+						</executions>
+					</plugin>
+					<plugin>
 						<groupId>org.apache.maven.plugins</groupId>
 						<artifactId>maven-remote-resources-plugin</artifactId>
 						<executions>
@@ -542,6 +587,26 @@
 							</excludes>
 						</configuration>
 					</plugin>
+					<plugin>
+						<groupId>com.lukegb.mojo</groupId>
+						<artifactId>gitdescribe-maven-plugin</artifactId>
+						<version>3.0</version>
+						<executions>
+							<execution>
+								<goals>
+									<goal>gitdescribe</goal>
+								</goals>
+								<id>git-describe</id>
+								<phase>initialize</phase>
+								<configuration>
+									<extraArguments>
+										<param>--tags</param>
+									</extraArguments>
+									<descriptionProperty>git-describe</descriptionProperty>
+								</configuration>
+							</execution>
+						</executions>
+					</plugin>
 				</plugins>
 			</build>
 			<activation>