You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2016/06/27 23:36:46 UTC

[20/34] ambari git commit: AMBARI-17355 & AMBARI-17354: POC: FE & BE changes for first class support for Yarn hosted services

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/pom.xml b/contrib/views/hive-next/pom.xml
new file mode 100644
index 0000000..c571352
--- /dev/null
+++ b/contrib/views/hive-next/pom.xml
@@ -0,0 +1,369 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.ambari.contrib.views</groupId>
+  <artifactId>hive-jdbc</artifactId>
+  <version>2.0.0.0-SNAPSHOT</version>
+  <name>Hive Jdbc</name>
+
+  <parent>
+    <groupId>org.apache.ambari.contrib.views</groupId>
+    <artifactId>ambari-contrib-views</artifactId>
+    <version>2.4.0.0.0</version>
+  </parent>
+
+  <repositories>
+    <repository>
+      <id>repository.apache.org</id>
+      <name>Apache Snapshot repo</name>
+      <url>https://repository.apache.org/content/groups/snapshots</url>
+    </repository>
+  </repositories>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.jayway.jsonpath</groupId>
+      <artifactId>json-path</artifactId>
+      <version>2.0.0</version>
+    </dependency>
+      <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey.contribs</groupId>
+      <artifactId>jersey-multipart</artifactId>
+      <version>1.18</version>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-client</artifactId>
+      <version>1.8</version>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-core</artifactId>
+      <version>1.18.1</version>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-json</artifactId>
+      <version>1.9</version>
+    </dependency>
+    <dependency>
+      <groupId>com.googlecode.json-simple</groupId>
+      <artifactId>json-simple</artifactId>
+      <version>1.1.1</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-configuration</groupId>
+      <artifactId>commons-configuration</artifactId>
+      <version>1.6</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-csv</artifactId>
+      <version>1.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-collections4</artifactId>
+      <version>4.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-views</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.2.2</version>
+    </dependency>
+    <dependency>
+      <groupId>javax.servlet</groupId>
+      <artifactId>javax.servlet-api</artifactId>
+      <version>3.0.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>1.7.5</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>1.7.5</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hadoop.version}</version>
+        <exclusions>
+            <exclusion>
+                <groupId>tomcat</groupId>
+                <artifactId>jasper-runtime</artifactId>
+            </exclusion>
+        </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+        <exclusions>
+            <exclusion>
+                <groupId>tomcat</groupId>
+                <artifactId>jasper-runtime</artifactId>
+            </exclusion>
+        </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>javax.ws.rs</groupId>
+      <artifactId>javax.ws.rs-api</artifactId>
+      <version>2.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-jdbc</artifactId>
+      <version>${hive-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+      <version>1.2</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <version>2.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <version>0.9.0</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-testkit_2.11</artifactId>
+      <version>2.3.15</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ambari.contrib.views</groupId>
+      <artifactId>ambari-views-utils</artifactId>
+      <version>2.4.0.0.0</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-validator</groupId>
+      <artifactId>commons-validator</artifactId>
+      <version>1.4.0</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>2.4</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.5.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpcore</artifactId>
+      <version>4.4.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-csv</artifactId>
+      <version>1.1</version>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-actor_2.11</artifactId>
+      <version>2.3.15</version>
+    </dependency>
+  </dependencies>
+
+  <properties>
+    <ambari.dir>${project.parent.parent.parent.basedir}</ambari.dir>
+    <hive-version>2.1.0-SNAPSHOT</hive-version>
+    <ambari.version>1.3.0.0-SNAPSHOT</ambari.version>
+  </properties>
+  <build>
+    <plugins>
+
+      <!-- Building frontend -->
+      <plugin>
+        <groupId>com.github.eirslett</groupId>
+        <artifactId>frontend-maven-plugin</artifactId>
+        <version>0.0.16</version>
+        <configuration>
+          <nodeVersion>v0.12.2</nodeVersion>
+          <npmVersion>1.4.8</npmVersion>
+          <workingDirectory>src/main/resources/ui/hive-web/</workingDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <id>install node and npm</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>install-node-and-npm</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>npm install</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>npm</goal>
+            </goals>
+            <configuration>
+              <arguments>install --python="${project.basedir}/../src/main/unix/ambari-python-wrap" --unsafe-perm</arguments>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>exec-maven-plugin</artifactId>
+        <groupId>org.codehaus.mojo</groupId>
+        <version>1.3.2</version>
+        <executions>
+          <execution>
+            <id>Hive build</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <workingDirectory>${basedir}/src/main/resources/ui/hive-web</workingDirectory>
+              <executable>node/node</executable>
+              <arguments>
+                <argument>node_modules/.bin/ember</argument>
+                <argument>build</argument>
+              </arguments>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.1</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+              <includeScope>runtime</includeScope>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+       <groupId>org.vafer</groupId>
+       <artifactId>jdeb</artifactId>
+       <version>1.0.1</version>
+       <executions>
+           <execution>
+               <phase>none</phase>
+               <goals>
+                   <goal>jdeb</goal>
+               </goals>
+           </execution>
+       </executions>
+       <configuration>
+           <submodules>false</submodules>
+       </configuration>
+     </plugin>
+    </plugins>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <filtering>false</filtering>
+        <includes>
+          <include>META-INF/**/*</include>
+          <include>view.xml</include>
+          <include>view.log4j.properties</include>
+          <include>application.conf</include>
+        </includes>
+      </resource>
+      <resource>
+        <directory>src/main/resources/ui/hive-web/dist</directory>
+        <filtering>false</filtering>
+      </resource>
+      <resource>
+        <directory>src/main/resources/ui/hive-web/bower_components/polestar</directory>
+        <filtering>false</filtering>
+        <targetPath>polestar</targetPath>
+      </resource>
+      <resource>
+        <directory>src/main/resources/ui/hive-web/bower_components/voyager</directory>
+        <filtering>false</filtering>
+        <targetPath>voyager</targetPath>
+      </resource>
+      <resource>
+        <targetPath>WEB-INF/lib</targetPath>
+        <filtering>false</filtering>
+        <directory>target/lib</directory>
+      </resource>
+    </resources>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/BaseService.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/BaseService.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/BaseService.java
new file mode 100644
index 0000000..773aa55
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/BaseService.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.utils.SharedObjectsFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+
+
+/**
+ * Parent service
+ */
+public class BaseService {
+  @Inject
+  protected ViewContext context;
+
+  protected final static Logger LOG =
+      LoggerFactory.getLogger(BaseService.class);
+
+  private SharedObjectsFactory sharedObjectsFactory;
+  public SharedObjectsFactory getSharedObjectsFactory() {
+    if (sharedObjectsFactory == null) {
+      sharedObjectsFactory = new SharedObjectsFactory(context);
+    }
+    return sharedObjectsFactory;
+  }
+
+  public void setSharedObjectsFactory(SharedObjectsFactory sharedObjectsFactory) {
+    this.sharedObjectsFactory = sharedObjectsFactory;
+  }
+
+  public BaseService() {
+//    Thread.currentThread().setContextClassLoader(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java
new file mode 100644
index 0000000..918dc68
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import com.google.common.base.Optional;
+import org.apache.ambari.view.hive2.actor.message.DDLJob;
+import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob;
+import org.apache.ambari.view.hive2.actor.message.HiveJob;
+import org.apache.ambari.view.hive2.internal.HiveResult;
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.jdbc.HiveStatement;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public interface ConnectionDelegate {
+  Optional<ResultSet> execute(HiveConnection connection, DDLJob job) throws SQLException;
+  Optional<ResultSet> executeSync(HiveConnection connection, DDLJob job) throws SQLException;
+  Optional<ResultSet> getColumnMetadata(HiveConnection connection, GetColumnMetadataJob job) throws SQLException;
+  Optional<ResultSet> getCurrentResultSet();
+  Optional<HiveStatement> getCurrentStatement();
+  void closeResultSet();
+  void closeStatement();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionFactory.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionFactory.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionFactory.java
new file mode 100644
index 0000000..2f7ffe0
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionFactory.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.FluentIterable;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.client.ConnectionConfig;
+
+import java.util.List;
+
+public class ConnectionFactory {
+
+  private static final String ZK_HIVE_DYN_SERVICE_DISCOVERY_KEY = "hive.server2.support.dynamic.service.discovery";
+  private static final String ZK_HIVE_NAMESPACE_KEY = "hive.server2.zookeeper.namespace";
+  private static final String ZK_HIVE_QUORUM = "hive.zookeeper.quorum";
+
+  private static final  String AMBARI_HIVE_SERVICE_NAME = "HIVE";
+  private static final String AMBARI_HIVESERVER_COMPONENT_NAME = "HIVE_SERVER";
+
+  private static final String HIVE_SITE = "hive-site";
+  private static final String HIVE_INTERACTIVE_SITE = "hive-interactive-site";
+
+  private static final String HIVE_JDBC_URL_KEY = "hive.jdbc.url";
+  private static final String HIVE_SESSION_PARAMS = "hive.session.params";
+
+  private static final String BINARY_PORT_KEY = "hive.server2.thrift.port";
+  private static final String HTTP_PORT_KEY = "hive.server2.thrift.http.port";
+  private static final String HIVE_TRANSPORT_MODE_KEY = "hive.server2.transport.mode";
+  private static final String HTTP_PATH_KEY = "hive.server2.thrift.http.path";
+
+
+  public static ConnectionConfig create(ViewContext context) {
+    String jdbcUrl;
+    if (context.getCluster() == null) {
+      jdbcUrl = getConnectFromCustom(context);
+    } else {
+      if (zookeeperConfigured(context)) {
+        jdbcUrl = getFromClusterZookeeperConfig(context);
+      } else {
+        jdbcUrl = getFromHiveConfiguration(context);
+      }
+    }
+
+    String userName = context.getUsername();
+    return new ConnectionConfig(userName, "", jdbcUrl);
+  }
+
+  private static String getFromHiveConfiguration(ViewContext context) {
+    String transportMode = context.getCluster().getConfigurationValue(HIVE_SITE, HIVE_TRANSPORT_MODE_KEY);
+    String binaryPort = context.getCluster().getConfigurationValue(HIVE_SITE, BINARY_PORT_KEY);
+    String httpPort = context.getCluster().getConfigurationValue(HIVE_SITE, HTTP_PORT_KEY);
+    String pathKey = context.getCluster().getConfigurationValue(HIVE_SITE, HTTP_PATH_KEY);
+    List<String> hiveHosts = context.getCluster().getHostsForServiceComponent(AMBARI_HIVE_SERVICE_NAME, AMBARI_HIVESERVER_COMPONENT_NAME);
+    String sessionParams = context.getProperties().get(HIVE_SESSION_PARAMS);
+
+    boolean isBinary = transportMode.equalsIgnoreCase("binary");
+    final String port = isBinary ? binaryPort : httpPort;
+
+    List<String> hostPorts = FluentIterable.from(hiveHosts).transform(new Function<String, String>() {
+      @Override
+      public String apply(String input) {
+        return input + ":" + port;
+      }
+    }).toList();
+
+    String concatHostPorts = Joiner.on(",").join(hostPorts);
+
+    StringBuilder builder = new StringBuilder();
+    builder.append("jdbc:hive2://")
+      .append(concatHostPorts)
+      .append(";")
+      .append(sessionParams);
+
+    if (!isBinary) {
+      builder.append(";").append("transportMode=http;httpPath=").append(pathKey);
+    }
+
+    return builder.toString();
+  }
+
+  private static String getFromClusterZookeeperConfig(ViewContext context) {
+    String quorum = context.getCluster().getConfigurationValue(HIVE_SITE, ZK_HIVE_QUORUM);
+    if (quorum == null) {
+      quorum = context.getCluster().getConfigurationValue(HIVE_INTERACTIVE_SITE, ZK_HIVE_QUORUM);
+    }
+
+    String namespace = context.getCluster().getConfigurationValue(HIVE_SITE, ZK_HIVE_NAMESPACE_KEY);
+    if (namespace == null) {
+      namespace = context.getCluster().getConfigurationValue(HIVE_INTERACTIVE_SITE, ZK_HIVE_NAMESPACE_KEY);
+    }
+
+    String sessionParams = context.getProperties().get(HIVE_SESSION_PARAMS);
+
+    String formatted = String.format("jdbc:hive2://%s/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=%s", quorum, namespace);
+    if(!Strings.isNullOrEmpty(sessionParams)){
+      return formatted + ";" + sessionParams;
+    }
+    return formatted;
+  }
+
+  private static boolean zookeeperConfigured(ViewContext context) {
+    boolean fromHiveSite = Boolean.valueOf(context.getCluster().getConfigurationValue(HIVE_SITE, ZK_HIVE_DYN_SERVICE_DISCOVERY_KEY));
+    boolean fromHiveInteractiveSite = Boolean.valueOf(context.getCluster().getConfigurationValue(HIVE_INTERACTIVE_SITE, ZK_HIVE_DYN_SERVICE_DISCOVERY_KEY));
+    return fromHiveInteractiveSite || fromHiveSite;
+  }
+
+  private static String getConnectFromCustom(ViewContext context) {
+    String jdbcUrl = context.getProperties().get(HIVE_JDBC_URL_KEY);
+    String hiveSessionParams = context.getProperties().get(HIVE_SESSION_PARAMS);
+    return jdbcUrl + ";" + hiveSessionParams;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java
new file mode 100644
index 0000000..f026ea6
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.actor.DeathWatch;
+import org.apache.ambari.view.hive2.actor.OperationController;
+import org.apache.ambari.view.hive2.internal.ConnectionSupplier;
+import org.apache.ambari.view.hive2.internal.DataStorageSupplier;
+import org.apache.ambari.view.hive2.internal.HdfsApiSupplier;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class ConnectionSystem {
+
+  private static final String ACTOR_CONF_FILE = "application.conf";
+  private static final String ACTOR_SYSTEM_NAME = "HiveViewActorSystem";
+  private ActorSystem actorSystem = null;
+  private static volatile ConnectionSystem instance = null;
+  private static final Object lock = new Object();
+  private static Map<String, ActorRef> operationControllerMap = new HashMap<>();
+
+  private ConnectionSystem() {
+    this.actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME);;
+  }
+
+  public static ConnectionSystem getInstance() {
+    if(instance == null) {
+      synchronized (lock) {
+        if(instance == null) {
+          instance = new ConnectionSystem();
+        }
+      }
+    }
+    return instance;
+  }
+
+  private ActorRef createOperationController() {
+    ActorRef deathWatch = actorSystem.actorOf(Props.create(DeathWatch.class));
+    return actorSystem.actorOf(
+      Props.create(OperationController.class, actorSystem,deathWatch,
+        new ConnectionSupplier(), new DataStorageSupplier(), new HdfsApiSupplier()));
+  }
+
+  public ActorSystem getActorSystem() {
+    return actorSystem;
+  }
+
+  /**
+   * Returns one operationController per View Instance
+   * @param context
+   * @return operationController Instance
+   */
+  public ActorRef getOperationController(ViewContext context) {
+    String instanceName = context.getInstanceName();
+    ActorRef ref = operationControllerMap.get(instanceName);
+    if(ref == null) {
+      synchronized (lock) {
+        ref = operationControllerMap.get(instanceName);
+        if(ref == null) {
+          ref = createOperationController();
+          operationControllerMap.put(instanceName, ref);
+        }
+      }
+    }
+    return ref;
+  }
+
+  public void shutdown() {
+    if(!actorSystem.isTerminated()) {
+      actorSystem.shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HelpService.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HelpService.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HelpService.java
new file mode 100644
index 0000000..b0316ab
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HelpService.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.ViewResourceHandler;
+import org.apache.ambari.view.hive2.resources.files.FileService;
+import org.apache.ambari.view.hive2.resources.jobs.atsJobs.ATSParserFactory;
+import org.apache.ambari.view.hive2.resources.jobs.atsJobs.ATSRequestsDelegateImpl;
+import org.json.simple.JSONObject;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+/**
+ * Help service
+ */
+public class HelpService extends BaseService {
+  @Inject
+  ViewContext context;
+
+  @Inject
+  protected ViewResourceHandler handler;
+
+  /**
+   * Constructor
+   */
+  public HelpService() {
+    super();
+  }
+
+  /**
+   * Version
+   * @return version
+   */
+  @GET
+  @Path("/version")
+  @Produces(MediaType.TEXT_PLAIN)
+  public Response version(){
+    return Response.ok("0.0.1-SNAPSHOT").build();
+  }
+
+  // ================================================================================
+  // Smoke tests
+  // ================================================================================
+
+  /**
+   * HDFS Status
+   * @return status
+   */
+  @GET
+  @Path("/hdfsStatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response hdfsStatus(){
+    FileService.hdfsSmokeTest(context);
+    return getOKResponse();
+  }
+
+  /**
+   * ATS Status
+   * @return status
+   */
+  @GET
+  @Path("/atsStatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response atsStatus() {
+    try {
+      ATSRequestsDelegateImpl atsimpl = new ATSRequestsDelegateImpl(context, ATSParserFactory.getATSUrl(context));
+      atsimpl.checkATSStatus();
+      return getOKResponse();
+    }catch (Exception e){
+      throw new WebApplicationException(e);
+    }
+  }
+
+  private Response getOKResponse() {
+    JSONObject response = new JSONObject();
+    response.put("message", "OK");
+    response.put("trace", null);
+    response.put("status", "200");
+    return Response.ok().entity(response).type(MediaType.APPLICATION_JSON).build();
+  }
+
+  /**
+   * Version
+   * @return version
+   */
+  @GET
+  @Path("/test")
+  @Produces(MediaType.TEXT_PLAIN)
+  public Response testStorage(){
+    TestBean test = new TestBean();
+    test.someData = "hello world";
+    getSharedObjectsFactory().getStorage().store(TestBean.class, test);
+    return Response.ok("OK").build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java
new file mode 100644
index 0000000..e8d3333
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import com.google.common.base.Optional;
+import org.apache.ambari.view.hive2.actor.message.DDLJob;
+import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob;
+import org.apache.ambari.view.hive2.actor.message.HiveJob;
+import org.apache.ambari.view.hive2.actor.message.job.Result;
+import org.apache.ambari.view.hive2.internal.HiveResult;
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.jdbc.HiveStatement;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class HiveJdbcConnectionDelegate implements ConnectionDelegate {
+
+  private ResultSet currentResultSet;
+  private HiveStatement currentStatement;
+  private String atsGuid;
+
+  @Override
+  public Optional<ResultSet> execute(HiveConnection connection, DDLJob job) throws SQLException {
+
+    try {
+      Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY);
+      currentStatement = (HiveStatement) statement;
+
+      for (String syncStatement : job.getSyncStatements()) {
+        // we don't care about the result
+        // fail all if one fails
+        statement.execute(syncStatement);
+      }
+
+      HiveStatement hiveStatement = (HiveStatement) statement;
+      boolean result = hiveStatement.executeAsync(job.getAsyncStatement());
+      atsGuid = hiveStatement.getYarnATSGuid();
+      if (result) {
+        // query has a result set
+        ResultSet resultSet = hiveStatement.getResultSet();
+        currentResultSet = resultSet;
+        Optional<ResultSet> resultSetOptional = Optional.of(resultSet);
+        return resultSetOptional;
+
+      }
+      return Optional.absent();
+
+    } catch (SQLException e) {
+      // Close the statement on any error
+      currentStatement.close();
+      throw e;
+    }
+  }
+
+  @Override
+  public Optional<ResultSet> executeSync(HiveConnection connection, DDLJob job) throws SQLException {
+    try {
+      Statement statement = connection.createStatement();
+      currentStatement = (HiveStatement) statement;
+
+      boolean hasResultSet = false;
+      for (String syncStatement : job.getStatements()) {
+        // we don't care about the result
+        // fail all if one fails
+        hasResultSet = statement.execute(syncStatement);
+      }
+
+      if (hasResultSet) {
+        ResultSet resultSet = statement.getResultSet();
+        //HiveResult result = new HiveResult(resultSet);
+        return Optional.of(resultSet);
+      } else {
+        return Optional.absent();
+      }
+    } catch (SQLException e) {
+      // Close the statement on any error
+      currentStatement.close();
+      throw e;
+    }
+  }
+
+
+  @Override
+  public Optional<ResultSet> getColumnMetadata(HiveConnection connection, GetColumnMetadataJob job) throws SQLException {
+    DatabaseMetaData metaData = connection.getMetaData();
+    ResultSet resultSet = metaData.getColumns("", job.getSchemaPattern(), job.getTablePattern(), job.getColumnPattern());
+    currentResultSet = resultSet;
+    return Optional.of(resultSet);
+  }
+
+  @Override
+  public Optional<ResultSet> getCurrentResultSet() {
+    return Optional.fromNullable(currentResultSet);
+  }
+
+  @Override
+  public Optional<HiveStatement> getCurrentStatement() {
+    return Optional.fromNullable(currentStatement);
+  }
+
+  @Override
+  public void closeResultSet() {
+
+    try {
+      if (currentResultSet != null) {
+        currentResultSet.close();
+      }
+    } catch (SQLException e) {
+      // Cannot do anything here
+    }
+  }
+
+  @Override
+  public void closeStatement()  {
+    try {
+      if (currentStatement != null) {
+        currentStatement.close();
+      }
+    } catch (SQLException e) {
+      // cannot do anything here
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveViewImpl.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveViewImpl.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveViewImpl.java
new file mode 100644
index 0000000..695c0ab
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveViewImpl.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import org.apache.ambari.view.View;
+import org.apache.ambari.view.ViewDefinition;
+import org.apache.ambari.view.ViewInstanceDefinition;
+import org.apache.ambari.view.hive2.utils.SharedObjectsFactory;
+import org.apache.ambari.view.utils.UserLocal;
+
+
+public class HiveViewImpl implements View {
+  @Override
+  public void onDeploy(ViewDefinition definition) {
+
+  }
+
+  @Override
+  public void onCreate(ViewInstanceDefinition definition) {
+
+  }
+
+  @Override
+  public void onDestroy(ViewInstanceDefinition definition) {
+    SharedObjectsFactory.dropInstanceCache(definition.getInstanceName());
+  }
+
+  @Override
+  public void onUpdate(ViewInstanceDefinition definition) {
+    //drop all cached connection for instance
+    UserLocal.dropInstanceCache(definition.getInstanceName());
+    SharedObjectsFactory.dropInstanceCache(definition.getInstanceName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/PropertyValidator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/PropertyValidator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/PropertyValidator.java
new file mode 100644
index 0000000..e406366
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/PropertyValidator.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import org.apache.ambari.view.ViewInstanceDefinition;
+import org.apache.ambari.view.utils.ambari.ValidatorUtils;
+import org.apache.ambari.view.validation.ValidationResult;
+import org.apache.ambari.view.validation.Validator;
+
+public class PropertyValidator implements Validator {
+
+  public static final String WEBHDFS_URL = "webhdfs.url";
+  public static final String HIVE_PORT = "hive.port";
+  public static final String YARN_ATS_URL = "yarn.ats.url";
+  public static final String HIVE_SESSION_PARAMS = "hive.session.params";
+
+  @Override
+  public ValidationResult validateInstance(ViewInstanceDefinition viewInstanceDefinition, ValidationContext validationContext) {
+    return null;
+  }
+
+  @Override
+  public ValidationResult validateProperty(String property, ViewInstanceDefinition viewInstanceDefinition, ValidationContext validationContext) {
+    // Validate non cluster associated properties
+    if (property.equals(HIVE_SESSION_PARAMS)) {
+      String auth = viewInstanceDefinition.getPropertyMap().get(HIVE_SESSION_PARAMS);
+
+      if (auth != null && !auth.isEmpty()) {
+        for(String param : auth.split(";")) {
+          String[] keyvalue = param.split("=");
+          if (keyvalue.length != 2) {
+            return new InvalidPropertyValidationResult(false, "Can not parse session param " + param + " in " + auth);
+          }
+        }
+      }
+    }
+
+    // if associated with cluster, no need to validate associated properties
+    Long cluster = viewInstanceDefinition.getClusterHandle();
+    if (cluster != null) {
+      return ValidationResult.SUCCESS;
+    }
+
+    // Cluster associated properties
+    if (property.equals(WEBHDFS_URL)) {
+      String webhdfsUrl = viewInstanceDefinition.getPropertyMap().get(WEBHDFS_URL);
+      if (!ValidatorUtils.validateHdfsURL(webhdfsUrl)) {
+        return new InvalidPropertyValidationResult(false, "Must be valid URL");
+      }
+    }
+
+    if (property.equals(YARN_ATS_URL)) {
+      String atsUrl = viewInstanceDefinition.getPropertyMap().get(YARN_ATS_URL);
+      if (!ValidatorUtils.validateHttpURL(atsUrl)) {
+        return new InvalidPropertyValidationResult(false, "Must be valid URL");
+      }
+    }
+
+    return ValidationResult.SUCCESS;
+  }
+
+  public static class InvalidPropertyValidationResult implements ValidationResult {
+    private boolean valid;
+    private String detail;
+
+    public InvalidPropertyValidationResult(boolean valid, String detail) {
+      this.valid = valid;
+      this.detail = detail;
+    }
+
+    @Override
+    public boolean isValid() {
+      return valid;
+    }
+
+    @Override
+    public String getDetail() {
+      return detail;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/TestBean.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/TestBean.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/TestBean.java
new file mode 100644
index 0000000..4b49f64
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/TestBean.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import org.apache.ambari.view.hive2.persistence.utils.Indexed;
+
+public class TestBean implements Indexed {
+  public String someData;
+  public String id;
+
+  @Override
+  public String getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(String id) {
+    this.id = id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java
new file mode 100644
index 0000000..9d3517f
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.ConnectionDelegate;
+import org.apache.ambari.view.hive2.actor.message.AsyncJob;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.RegisterActor;
+import org.apache.ambari.view.hive2.actor.message.ResultReady;
+import org.apache.ambari.view.hive2.actor.message.StartLogAggregation;
+import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.InactivityCheck;
+import org.apache.ambari.view.hive2.internal.Either;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.jdbc.HiveStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+
+public class AsyncJdbcConnector extends JdbcConnector {
+
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  private ActorRef logAggregator = null;
+  private ActorRef asyncQueryExecutor = null;
+  private ActorRef resultSetActor = null;
+
+
+  public AsyncJdbcConnector(ViewContext viewContext, HdfsApi hdfsApi, ActorSystem system, ActorRef parent,ActorRef deathWatch, ConnectionDelegate connectionDelegate, Storage storage) {
+    super(viewContext, hdfsApi, system, parent,deathWatch, connectionDelegate, storage);
+  }
+
+  @Override
+  protected void handleJobMessage(HiveMessage message) {
+    Object job = message.getMessage();
+    if (job instanceof AsyncJob) {
+      LOG.debug("Executing async job " + message.toString());
+      execute((AsyncJob) job);
+    }
+  }
+
+  @Override
+  protected boolean isAsync() {
+    return true;
+  }
+
+  @Override
+  protected void cleanUpChildren() {
+    if(logAggregator != null && !logAggregator.isTerminated()) {
+      LOG.debug("Sending poison pill to log aggregator");
+      logAggregator.tell(PoisonPill.getInstance(), self());
+    }
+
+    if(asyncQueryExecutor != null && !asyncQueryExecutor.isTerminated()) {
+      LOG.debug("Sending poison pill to Async Query Executor");
+      asyncQueryExecutor.tell(PoisonPill.getInstance(), self());
+    }
+
+    if(resultSetActor != null && !resultSetActor.isTerminated()) {
+      LOG.debug("Sending poison pill to Resultset Actor");
+      resultSetActor.tell(PoisonPill.getInstance(), self());
+    }
+  }
+
+  @Override
+  protected void notifyFailure() {
+    AsyncExecutionFailed failure = new AsyncExecutionFailed(jobId,username,"Cannot connect to hive");
+    parent.tell(failure, self());
+  }
+
+  private void execute(AsyncJob message) {
+    this.executing = true;
+    this.jobId = message.getJobId();
+    updateJobStatus(jobId,Job.JOB_STATE_INITIALIZED);
+    if (connectable == null) {
+      notifyAndCleanUp();
+      return;
+    }
+
+    Optional<HiveConnection> connectionOptional = connectable.getConnection();
+    if (!connectionOptional.isPresent()) {
+      notifyAndCleanUp();
+      return;
+    }
+
+    try {
+      Optional<ResultSet> resultSetOptional = connectionDelegate.execute(connectionOptional.get(), message);
+      Optional<HiveStatement> currentStatement = connectionDelegate.getCurrentStatement();
+      // There should be a result set, which either has a result set, or an empty value
+      // for operations which do not return anything
+
+      logAggregator = getContext().actorOf(
+        Props.create(LogAggregator.class, system, hdfsApi, currentStatement.get(), message.getLogFile())
+        .withDispatcher("akka.actor.misc-dispatcher"),   message.getJobId() + ":" +"-logAggregator"
+      );
+      deathWatch.tell(new RegisterActor(logAggregator),self());
+
+      updateGuidInJob(jobId, currentStatement.get());
+      updateJobStatus(jobId,Job.JOB_STATE_RUNNING);
+
+      if (resultSetOptional.isPresent()) {
+        // Start a result set aggregator on the same context, a notice to the parent will kill all these as well
+        // tell the result holder to assign the result set for further operations
+        resultSetActor = getContext().actorOf(Props.create(ResultSetIterator.class, self(),
+          resultSetOptional.get(),storage).withDispatcher("akka.actor.result-dispatcher"),
+          "ResultSetActor:ResultSetIterator:JobId:"+ jobId );
+        deathWatch.tell(new RegisterActor(resultSetActor),self());
+        parent.tell(new ResultReady(jobId,username, Either.<ActorRef, ActorRef>left(resultSetActor)), self());
+
+        // Start a actor to query ATS
+      } else {
+        // Case when this is an Update/query with no results
+        // Wait for operation to complete and add results;
+
+        ActorRef asyncQueryExecutor = getContext().actorOf(
+                Props.create(AsyncQueryExecutor.class, parent, currentStatement.get(),storage,jobId,username)
+                  .withDispatcher("akka.actor.result-dispatcher"),
+                 message.getJobId() + "-asyncQueryExecutor");
+        deathWatch.tell(new RegisterActor(asyncQueryExecutor),self());
+        parent.tell(new ResultReady(jobId,username, Either.<ActorRef, ActorRef>right(asyncQueryExecutor)), self());
+
+      }
+      // Start a actor to query log
+      logAggregator.tell(new StartLogAggregation(), self());
+
+
+    } catch (SQLException e) {
+      // update the error on the log
+      AsyncExecutionFailed failure = new AsyncExecutionFailed(message.getJobId(),username,
+              e.getMessage(), e);
+      updateJobStatus(jobId,Job.JOB_STATE_ERROR);
+      parent.tell(failure, self());
+      // Update the operation controller to write an error on tshe right side
+      LOG.error("Caught SQL excpetion for job-"+message,e);
+
+    }
+
+    // Start Inactivity timer to close the statement
+    this.inactivityScheduler = system.scheduler().schedule(
+      Duration.Zero(), Duration.create(15 * 1000, TimeUnit.MILLISECONDS),
+      this.self(), new InactivityCheck(), system.dispatcher(), null);
+  }
+
+  private void notifyAndCleanUp() {
+    updateJobStatus(jobId, Job.JOB_STATE_ERROR);
+    notifyFailure();
+    cleanUp();
+  }
+
+  private void updateJobStatus(String jobId, String jobState) {
+    JobImpl job = null;
+    try {
+      job = storage.load(JobImpl.class, jobId);
+    } catch (ItemNotFound itemNotFound) {
+      itemNotFound.printStackTrace();
+    }
+    job.setStatus(jobState);
+    storage.store(JobImpl.class, job);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java
new file mode 100644
index 0000000..2a1dbb3
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import org.apache.ambari.view.hive2.actor.message.ExecuteQuery;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
+import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp;
+import org.apache.ambari.view.hive2.internal.AsyncExecutionSuccess;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class AsyncQueryExecutor extends HiveActor {
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  private Statement statement;
+  private final Storage storage;
+  private final String jobId;
+  private final ActorRef parent;
+  private final String userName;
+
+  public AsyncQueryExecutor(ActorRef parent, Statement statement, Storage storage, String jobId,String userName) {
+    this.statement = statement;
+    this.storage = storage;
+    this.jobId = jobId;
+    this.parent = parent;
+    this.userName = userName;
+  }
+
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+    Object message = hiveMessage.getMessage();
+
+    if (message instanceof ExecuteQuery) {
+      executeQuery();
+    }
+
+  }
+
+  private void executeQuery() {
+    JobImpl job = null;
+    try {
+      job = storage.load(JobImpl.class, jobId);
+      statement.getUpdateCount();
+      LOG.info("Job execution successful. Setting status in db.");
+      job.setStatus(Job.JOB_STATE_FINISHED);
+      storage.store(JobImpl.class, job);
+      sender().tell(new AsyncExecutionSuccess(), self());
+
+    } catch (SQLException e) {
+      job.setStatus(Job.JOB_STATE_ERROR);
+      sender().tell(new AsyncExecutionFailed(jobId,userName, e.getMessage(), e), self());
+      storage.store(JobImpl.class, job);
+    } catch (ItemNotFound itemNotFound) {
+      sender().tell(new AsyncExecutionFailed(jobId,userName, "Cannot load job", itemNotFound), self());
+    } finally {
+      // We can clean up this connection here
+      parent.tell(new CleanUp(), self());
+    }
+
+  }
+
+
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/DeathWatch.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/DeathWatch.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/DeathWatch.java
new file mode 100644
index 0000000..c146dd0
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/DeathWatch.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.Terminated;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.RegisterActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+
+public class DeathWatch extends HiveActor {
+
+    private final static Logger LOG =
+            LoggerFactory.getLogger(DeathWatch.class);
+
+    @Override
+    void handleMessage(HiveMessage hiveMessage) {
+        Object message = hiveMessage.getMessage();
+        if(message instanceof RegisterActor){
+            RegisterActor registerActor = (RegisterActor) message;
+            ActorRef actorRef = registerActor.getActorRef();
+            this.getContext().watch(actorRef);
+            LOG.info("Registered new actor "+ actorRef);
+            LOG.info("Registration for {} at {}", actorRef,new Date());
+        }
+        if(message instanceof Terminated){
+            Terminated terminated = (Terminated) message;
+            ActorRef actor = terminated.actor();
+            LOG.info("Received terminate for actor "+ actor);
+            LOG.info("Termination for {} at {}", actor,new Date());
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java
new file mode 100644
index 0000000..c2ee5c7
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+public class GetResultHolder {
+
+    private String jobId;
+    private String userName;
+
+    public GetResultHolder(String jobId, String userName) {
+        this.jobId = jobId;
+        this.userName = userName;
+    }
+
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    @Override
+    public String toString() {
+        return "GetResultHolder{" +
+                "jobId='" + jobId + '\'' +
+                ", userName='" + userName + '\'' +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/HiveActor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/HiveActor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/HiveActor.java
new file mode 100644
index 0000000..63e5dd1
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/HiveActor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.UntypedActor;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class HiveActor extends UntypedActor {
+
+    private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+    @Override
+    final public void onReceive(Object message) throws Exception {
+        HiveMessage hiveMessage = new HiveMessage(message);
+        if(LOG.isDebugEnabled()){
+            LOG.debug("Received message: " + message.getClass().getName() + ", generated id: " + hiveMessage.getId() +
+                    " sent by: " + sender() + ", recieved by" + self());
+        }
+
+        handleMessage(hiveMessage);
+
+        if(LOG.isDebugEnabled()){
+            LOG.debug("Message submitted: " + hiveMessage.getId());
+
+        }
+    }
+
+    abstract void handleMessage(HiveMessage hiveMessage);
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
new file mode 100644
index 0000000..7769dde
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.ConnectionDelegate;
+import org.apache.ambari.view.hive2.actor.message.Connect;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.JobExecutionCompleted;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.InactivityCheck;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.KeepAlive;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.TerminateInactivityCheck;
+import org.apache.ambari.view.hive2.internal.Connectable;
+import org.apache.ambari.view.hive2.internal.ConnectionException;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
+import org.apache.ambari.view.hive2.utils.HiveActorConfiguration;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.hive.jdbc.HiveStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Wraps one Jdbc connection per user, per instance. This is used to delegate execute the statements and
+ * creates child actors to delegate the resultset extraction, YARN/ATS querying for ExecuteJob info and Log Aggregation
+ */
+public abstract class JdbcConnector extends HiveActor {
+
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  /**
+   * Interval for maximum inactivity allowed
+   */
+  private final static long MAX_INACTIVITY_INTERVAL = 5 * 60 * 1000;
+
+  /**
+   * Interval for maximum inactivity allowed before termination
+   */
+  private static final long MAX_TERMINATION_INACTIVITY_INTERVAL = 10 * 60 * 1000;
+
+  protected final ViewContext viewContext;
+  protected final ActorSystem system;
+  protected final Storage storage;
+
+  /**
+   * Keeps track of the timestamp when the last activity has happened. This is
+   * used to calculate the inactivity period and take lifecycle decisions based
+   * on it.
+   */
+  private long lastActivityTimestamp;
+
+  /**
+   * Akka scheduler to tick at an interval to deal with inactivity of this actor
+   */
+  protected Cancellable inactivityScheduler;
+
+  /**
+   * Akka scheduler to tick at an interval to deal with the inactivity after which
+   * the actor should be killed and connectable should be released
+   */
+  protected Cancellable terminateActorScheduler;
+
+  protected Connectable connectable = null;
+  protected final ActorRef deathWatch;
+  protected final ConnectionDelegate connectionDelegate;
+  protected final ActorRef parent;
+  protected final HdfsApi hdfsApi;
+
+  /**
+   * true if the actor is currently executing any job.
+   */
+  protected boolean executing = false;
+
+  /**
+   * true if the currently executing job is async job.
+   */
+  private boolean async = true;
+
+  /**
+   * Returns the timeout configurations.
+   */
+  private final HiveActorConfiguration actorConfiguration;
+  protected String username;
+  protected String jobId;
+
+  public JdbcConnector(ViewContext viewContext, HdfsApi hdfsApi, ActorSystem system, ActorRef parent, ActorRef deathWatch,
+                       ConnectionDelegate connectionDelegate, Storage storage) {
+    this.viewContext = viewContext;
+    this.hdfsApi = hdfsApi;
+    this.system = system;
+    this.parent = parent;
+    this.deathWatch = deathWatch;
+    this.connectionDelegate = connectionDelegate;
+    this.storage = storage;
+    this.lastActivityTimestamp = System.currentTimeMillis();
+    actorConfiguration = new HiveActorConfiguration(viewContext);
+  }
+
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+    Object message = hiveMessage.getMessage();
+    if (message instanceof InactivityCheck) {
+      checkInactivity();
+    } else if (message instanceof TerminateInactivityCheck) {
+      checkTerminationInactivity();
+    } else if (message instanceof KeepAlive) {
+      keepAlive();
+    } else if (message instanceof CleanUp) {
+      cleanUp();
+    } else if (message instanceof JobExecutionCompleted) {
+      jobExecutionCompleted();
+    } else {
+      handleNonLifecycleMessage(hiveMessage);
+    }
+  }
+
+  private void handleNonLifecycleMessage(HiveMessage hiveMessage) {
+    Object message = hiveMessage.getMessage();
+    keepAlive();
+    if (message instanceof Connect) {
+      connect((Connect) message);
+    } else {
+      handleJobMessage(hiveMessage);
+    }
+
+  }
+
+  protected abstract void handleJobMessage(HiveMessage message);
+
+  protected abstract boolean isAsync();
+
+  protected abstract void notifyFailure();
+
+  protected abstract void cleanUpChildren();
+
+  private void keepAlive() {
+    lastActivityTimestamp = System.currentTimeMillis();
+  }
+
+  private void jobExecutionCompleted() {
+    // Set is executing as false so that the inactivity checks can finish cleanup
+    // after timeout
+    LOG.info("Job execution completed for user: {}. Results are ready to be fetched", username);
+    this.executing = false;
+  }
+
+  protected Optional<String> getJobId() {
+    return Optional.fromNullable(jobId);
+  }
+
+  protected Optional<String> getUsername() {
+    return Optional.fromNullable(username);
+  }
+
+  private void connect(Connect message) {
+    this.username = message.getUsername();
+    // check the connectable
+    if (connectable == null) {
+      connectable = message.getConnectable();
+    }
+    // make the connectable to Hive
+    try {
+      if (!connectable.isOpen()) {
+        connectable.connect();
+      }
+    } catch (ConnectionException e) {
+      // set up job failure
+      // notify parent about job failure
+      this.notifyFailure();
+      cleanUp();
+      return;
+    }
+
+    this.terminateActorScheduler = system.scheduler().schedule(
+      Duration.Zero(), Duration.create(60 * 1000, TimeUnit.MILLISECONDS),
+      this.getSelf(), new TerminateInactivityCheck(), system.dispatcher(), null);
+
+  }
+
+  protected void updateGuidInJob(String jobId, HiveStatement statement) {
+    String yarnAtsGuid = statement.getYarnATSGuid();
+    try {
+      JobImpl job = storage.load(JobImpl.class, jobId);
+      job.setGuid(yarnAtsGuid);
+      storage.store(JobImpl.class, job);
+    } catch (ItemNotFound itemNotFound) {
+      // Cannot do anything if the job is not present
+    }
+  }
+
+  private void checkInactivity() {
+    LOG.info("Inactivity check, executing status: {}", executing);
+    if (executing) {
+      keepAlive();
+      return;
+    }
+    long current = System.currentTimeMillis();
+    if ((current - lastActivityTimestamp) > actorConfiguration.getInactivityTimeout(MAX_INACTIVITY_INTERVAL)) {
+      // Stop all the sub-actors created
+      cleanUp();
+    }
+  }
+
+  private void checkTerminationInactivity() {
+    if (!isAsync()) {
+      // Should not terminate if job is sync. Will terminate after the job is finished.
+      stopTeminateInactivityScheduler();
+      return;
+    }
+
+    LOG.info("Termination check, executing status: {}", executing);
+    if (executing) {
+      keepAlive();
+      return;
+    }
+
+    long current = System.currentTimeMillis();
+    if ((current - lastActivityTimestamp) > actorConfiguration.getTerminationTimeout(MAX_TERMINATION_INACTIVITY_INTERVAL)) {
+      cleanUpWithTermination();
+    }
+  }
+
+  protected void cleanUp() {
+    if(jobId != null) {
+      LOG.debug("{} :: Cleaning up resources for inactivity for jobId: {}", self().path().name(), jobId);
+    } else {
+      LOG.debug("{} ::Cleaning up resources with inactivity for Sync execution.", self().path().name());
+    }
+    this.executing = false;
+    cleanUpStatementAndResultSet();
+    cleanUpChildren();
+    stopInactivityScheduler();
+    parent.tell(new FreeConnector(username, jobId, isAsync()), self());
+  }
+
+  protected void cleanUpWithTermination() {
+    LOG.debug("{} :: Cleaning up resources with inactivity for Sync execution.", self().path().name());
+    cleanUpStatementAndResultSet();
+
+    cleanUpChildren();
+    stopInactivityScheduler();
+    stopTeminateInactivityScheduler();
+    parent.tell(new DestroyConnector(username, jobId, isAsync()), this.self());
+    self().tell(PoisonPill.getInstance(), ActorRef.noSender());
+  }
+
+
+  private void cleanUpStatementAndResultSet() {
+    connectionDelegate.closeStatement();
+    connectionDelegate.closeResultSet();
+  }
+
+  private void stopTeminateInactivityScheduler() {
+    if (!(terminateActorScheduler == null || terminateActorScheduler.isCancelled())) {
+      terminateActorScheduler.cancel();
+    }
+  }
+
+  private void stopInactivityScheduler() {
+    if (!(inactivityScheduler == null || inactivityScheduler.isCancelled())) {
+      inactivityScheduler.cancel();
+    }
+  }
+
+  @Override
+  public void postStop() throws Exception {
+    stopInactivityScheduler();
+    stopTeminateInactivityScheduler();
+
+    if (connectable.isOpen()) {
+      connectable.disconnect();
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
new file mode 100644
index 0000000..284345d
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Cancellable;
+import com.google.common.base.Joiner;
+import org.apache.ambari.view.hive2.actor.message.GetMoreLogs;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.LogAggregationFinished;
+import org.apache.ambari.view.hive2.actor.message.StartLogAggregation;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.ambari.view.utils.hdfs.HdfsApiException;
+import org.apache.ambari.view.utils.hdfs.HdfsUtil;
+import org.apache.hive.jdbc.HiveStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Reads the logs for a ExecuteJob from the Statement and writes them into hdfs.
+ */
+public class LogAggregator extends HiveActor {
+
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  public static final int AGGREGATION_INTERVAL = 5 * 1000;
+  private final HdfsApi hdfsApi;
+  private final HiveStatement statement;
+  private final String logFile;
+  private final ActorSystem system;
+
+  private Cancellable moreLogsScheduler;
+  private ActorRef parent;
+
+  public LogAggregator(ActorSystem system, HdfsApi hdfsApi, HiveStatement statement, String logFile) {
+    this.system = system;
+    this.hdfsApi = hdfsApi;
+    this.statement = statement;
+    this.logFile = logFile;
+  }
+
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+    Object message = hiveMessage.getMessage();
+    if (message instanceof StartLogAggregation) {
+      start();
+    }
+
+    if (message instanceof GetMoreLogs) {
+      try {
+        getMoreLogs();
+      } catch (SQLException e) {
+        LOG.error("SQL Error while getting logs. Tried writing to: {}", logFile);
+      } catch (HdfsApiException e) {
+        LOG.warn("HDFS Error while getting writing logs to {}", logFile);
+
+      }
+    }
+  }
+
+  private void start() {
+    parent = this.getSender();
+    this.moreLogsScheduler = system.scheduler().schedule(
+      Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, TimeUnit.MILLISECONDS),
+      this.getSelf(), new GetMoreLogs(), system.dispatcher(), null);
+  }
+
+  private void getMoreLogs() throws SQLException, HdfsApiException {
+    if (statement.hasMoreLogs()) {
+      List<String> logs = statement.getQueryLog();
+      String allLogs = Joiner.on("\n").skipNulls().join(logs);
+      HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
+    } else {
+      moreLogsScheduler.cancel();
+      parent.tell(new LogAggregationFinished(), ActorRef.noSender());
+    }
+  }
+
+  @Override
+  public void postStop() throws Exception {
+    if(moreLogsScheduler != null && !moreLogsScheduler.isCancelled()){
+      moreLogsScheduler.cancel();
+    }
+
+  }
+
+}