You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/01/05 00:05:00 UTC

[19/50] [abbrv] ambari git commit: AMBARI-19321 : Hive View 2.0 - Minimal view for Hive which includes new UI changes. Also made changes in poms as required (nitirajrathore)

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseChangeNotifier.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseChangeNotifier.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseChangeNotifier.java
new file mode 100644
index 0000000..37f24d2
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseChangeNotifier.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.collect.Sets;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.internal.dto.TableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ *
+ */
+public class DatabaseChangeNotifier extends HiveActor {
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  private String currentDatabaseName;
+  private Map<String, TableWrapper> tables = new HashMap<>();
+  private Map<String, TableInfo> newTables = new HashMap<>();
+
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+    Object message = hiveMessage.getMessage();
+    if(message instanceof DatabaseAdded) {
+      handleDatabaseAdded((DatabaseAdded) message);
+    } else if ( message instanceof DatabaseRemoved) {
+      handleDatabaseRemoved((DatabaseRemoved) message);
+    } else if (message instanceof TableUpdated) {
+      handleTableUpdated((TableUpdated) message);
+    } else if (message instanceof AllTablesUpdated) {
+      handleAllTableUpdated((AllTablesUpdated) message);
+    }
+  }
+
+  private void handleDatabaseAdded(DatabaseAdded message) {
+    LOG.info("Database Added: {}", message.name);
+    currentDatabaseName = message.name;
+    // TODO: Send event to eventbus
+  }
+
+  private void handleDatabaseRemoved(DatabaseRemoved message) {
+    LOG.info("Database Removed: {}", message.name);
+    // TODO: Send event to eventbus
+  }
+
+  private void handleTableUpdated(TableUpdated message) {
+    LOG.info("XXXXX: table xxxx. Size: {}", newTables.size());
+    newTables.put(message.info.getName(), message.info);
+  }
+
+  private void handleAllTableUpdated(AllTablesUpdated message) {
+    Set<String> oldTableNames = new HashSet<>(tables.keySet());
+    Set<String> newTableNames = new HashSet<>(newTables.keySet());
+
+    Set<String> tablesAdded = Sets.difference(newTableNames, oldTableNames);
+    Set<String> tablesRemoved = Sets.difference(oldTableNames, newTableNames);
+    Set<String> tablesUpdated = Sets.intersection(oldTableNames, newTableNames);
+
+    updateTablesAdded(tablesAdded);
+    updateTablesRemoved(tablesRemoved);
+    updateTablesUpdated(tablesUpdated);
+    newTables.clear();
+  }
+
+  private void updateTablesAdded(Set<String> tablesAdded) {
+    for (String tableName: tablesAdded) {
+      TableWrapper wrapper = new TableWrapper(tableName);
+      tables.put(tableName, wrapper);
+      wrapper.getTableNotifier().tell(new TableChangeNotifier.TableAdded(newTables.get(tableName)), getSelf());
+    }
+  }
+
+  private void updateTablesRemoved(Set<String> tablesRemoved) {
+    for(String tableName: tablesRemoved) {
+      TableWrapper tableWrapper = tables.remove(tableName);
+      tableWrapper.getTableNotifier().tell(new TableChangeNotifier.TableRemoved(tableName), getSelf());
+      tableWrapper.getTableNotifier().tell(PoisonPill.getInstance(), getSelf());
+    }
+  }
+
+  private void updateTablesUpdated(Set<String> tablesUpdated) {
+    for(String tableName: tablesUpdated) {
+      TableWrapper tableWrapper = tables.get(tableName);
+      // TODO: Check what needs to be done here.
+    }
+  }
+
+  public static Props props() {
+    return Props.create(DatabaseChangeNotifier.class);
+  }
+
+  public class TableWrapper {
+    private final String tableName;
+    private final ActorRef tableNotifier;
+
+    private TableWrapper(String tableName) {
+      this.tableName = tableName;
+      this.tableNotifier = getContext().actorOf(TableChangeNotifier.props());
+    }
+
+    public String getTableName() {
+      return tableName;
+    }
+
+    public ActorRef getTableNotifier() {
+      return tableNotifier;
+    }
+  }
+
+  public static class DatabaseAdded {
+    private final String name;
+
+    public DatabaseAdded(String name) {
+      this.name = name;
+    }
+  }
+
+
+  public static class DatabaseRemoved {
+    private final String name;
+
+    public DatabaseRemoved(String name) {
+      this.name = name;
+    }
+  }
+
+  public static class TableUpdated {
+    private final TableInfo info;
+
+    public TableUpdated(TableInfo info) {
+      this.info = info;
+    }
+  }
+
+  public static class AllTablesUpdated {
+    private final String database;
+
+    public AllTablesUpdated(String database) {
+      this.database = database;
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseManager.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseManager.java
new file mode 100644
index 0000000..6dc4ad9
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseManager.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Sets;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive20.AuthParams;
+import org.apache.ambari.view.hive20.ConnectionFactory;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.client.ConnectionConfig;
+import org.apache.ambari.view.hive20.internal.Connectable;
+import org.apache.ambari.view.hive20.internal.HiveConnectionWrapper;
+import org.apache.ambari.view.hive20.internal.dto.DatabaseInfo;
+import org.apache.ambari.view.hive20.internal.dto.TableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manages database related state, queries Hive to get the list of databases and then manages state for each database.
+ * Also, periodically updates the list of databases by calling hive.
+ */
+public class DatabaseManager extends HiveActor {
+
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  private final Connectable connectable;
+
+  private final ActorRef metaDataRetriever;
+  private final String username;
+
+  private boolean refreshInProgress = false;
+  private boolean selfRefreshQueued = false;
+
+  private Map<String, DatabaseWrapper> databases = new HashMap<>();
+  private Set<String> databasesToUpdate;
+
+
+  public DatabaseManager(String username, Connectable connectable) {
+    this.username = username;
+    this.connectable = connectable;
+    metaDataRetriever = getContext().actorOf(MetaDataRetriever.props(connectable));
+  }
+
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+
+    Object message = hiveMessage.getMessage();
+    if (message instanceof Refresh) {
+      handleRefresh();
+    } else if (message instanceof SelfRefresh) {
+      handleSelfRefresh();
+    } else if (message instanceof MetaDataRetriever.DBRefreshed) {
+      handleDBRefreshed((MetaDataRetriever.DBRefreshed) message);
+    } else if (message instanceof MetaDataRetriever.TableRefreshed) {
+      handleTableRefreshed((MetaDataRetriever.TableRefreshed) message);
+    } else if (message instanceof MetaDataRetriever.AllTableRefreshed) {
+      handleAllTableRefeshed((MetaDataRetriever.AllTableRefreshed) message);
+    } else if (message instanceof GetDatabases) {
+      handleGetDatabases((GetDatabases) message);
+    }
+
+  }
+
+  private void handleSelfRefresh() {
+    if (refreshInProgress) {
+      getContext().system().scheduler().scheduleOnce(Duration.create(500, TimeUnit.MILLISECONDS),
+          getSelf(), new SelfRefresh(), getContext().dispatcher(), getSelf());
+    } else {
+      selfRefreshQueued = false;
+      refresh();
+    }
+  }
+
+  private void handleRefresh() {
+    if (refreshInProgress && selfRefreshQueued) {
+      return; // We will not honor refresh message when a refresh is going on and another self refresh is queued in mailbox
+    } else if (refreshInProgress) {
+      selfRefreshQueued = true; // If refresh is in progress, we will queue up only one refresh message.
+      getContext().system().scheduler().scheduleOnce(Duration.create(500, TimeUnit.MILLISECONDS),
+          getSelf(), new SelfRefresh(), getContext().dispatcher(), getSelf());
+    } else {
+      refresh();
+    }
+  }
+
+  private void handleDBRefreshed(MetaDataRetriever.DBRefreshed message) {
+    Set<DatabaseInfo> databasesInfos = message.getDatabases();
+    Set<String> currentDatabases = new HashSet<>(databases.keySet());
+    Set<String> newDatabases = FluentIterable.from(databasesInfos).transform(new Function<DatabaseInfo, String>() {
+      @Nullable
+      @Override
+      public String apply(@Nullable DatabaseInfo databaseInfo) {
+        return databaseInfo.getName();
+      }
+    }).toSet();
+
+    databasesToUpdate = new HashSet<>(newDatabases);
+
+    Set<String> databasesAdded = Sets.difference(newDatabases, currentDatabases);
+    Set<String> databasesRemoved = Sets.difference(currentDatabases, newDatabases);
+
+    updateDatabasesAdded(databasesAdded, databasesInfos);
+    updateDatabasesRemoved(databasesRemoved);
+  }
+
+  private void updateDatabasesAdded(Set<String> databasesAdded, Set<DatabaseInfo> databasesInfos) {
+    for (DatabaseInfo info : databasesInfos) {
+      if (databasesAdded.contains(info.getName())) {
+        DatabaseWrapper wrapper = new DatabaseWrapper(info);
+        databases.put(info.getName(), wrapper);
+        wrapper.getDatabaseNotifier().tell(new DatabaseChangeNotifier.DatabaseAdded(info.getName()), getSelf());
+      }
+    }
+  }
+
+  private void updateDatabasesRemoved(Set<String> databasesRemoved) {
+    for (String database : databasesRemoved) {
+      DatabaseWrapper wrapper = databases.remove(database);
+      ActorRef notifier = wrapper.getDatabaseNotifier();
+      notifier.tell(new DatabaseChangeNotifier.DatabaseRemoved(database), getSelf());
+      notifier.tell(PoisonPill.getInstance(), getSelf());
+    }
+  }
+
+  private void handleTableRefreshed(MetaDataRetriever.TableRefreshed message) {
+    ActorRef databaseChangeNotifier = getDatabaseChangeNotifier(message.getDatabase());
+    updateTable(message.getDatabase(), message.getTable());
+    databaseChangeNotifier.tell(new DatabaseChangeNotifier.TableUpdated(message.getTable()), getSelf());
+  }
+
+  private void handleAllTableRefeshed(MetaDataRetriever.AllTableRefreshed message) {
+    ActorRef databaseChangeNotifier = getDatabaseChangeNotifier(message.getDatabase());
+    databaseChangeNotifier.tell(new DatabaseChangeNotifier.AllTablesUpdated(message.getDatabase()), getSelf());
+    if (checkIfAllTablesOfAllDatabaseRefeshed(message)) {
+      refreshInProgress = false;
+    }
+  }
+
+  private void handleGetDatabases(GetDatabases message) {
+    if (refreshInProgress) {
+      // If currently refreshing, then schedule the same message after 500 milliseconds
+      getContext().system().scheduler().scheduleOnce(Duration.create(500, TimeUnit.MILLISECONDS),
+          getSelf(), message, getContext().dispatcher(), getSender());
+      return;
+    }
+    Set<DatabaseInfo> infos = new HashSet<>();
+    for (DatabaseWrapper wrapper : databases.values()) {
+      infos.add(wrapper.getDatabase());
+    }
+    getSender().tell(new DatabasesResult(infos), getSelf());
+  }
+
+  private boolean checkIfAllTablesOfAllDatabaseRefeshed(MetaDataRetriever.AllTableRefreshed message) {
+    databasesToUpdate.remove(message.getDatabase());
+    return databasesToUpdate.isEmpty();
+  }
+
+  private ActorRef getDatabaseChangeNotifier(String databaseName) {
+    DatabaseWrapper wrapper = databases.get(databaseName);
+    ActorRef databaseChangeNotifier = null;
+    if (wrapper != null) {
+      databaseChangeNotifier = wrapper.getDatabaseNotifier();
+    }
+    return databaseChangeNotifier;
+  }
+
+  private void refresh() {
+    LOG.info("Received refresh for user");
+    refreshInProgress = true;
+    metaDataRetriever.tell(new MetaDataRetriever.RefreshDB(), getSelf());
+
+    scheduleRefreshAfter(1, TimeUnit.MINUTES);
+  }
+
+  private void scheduleRefreshAfter(long time, TimeUnit timeUnit) {
+    getContext().system().scheduler().scheduleOnce(Duration.create(time, timeUnit),
+        getSelf(), new Refresh(username), getContext().dispatcher(), getSelf());
+  }
+
+  @Override
+  public void postStop() throws Exception {
+    LOG.info("Database Manager stopped!!!");
+    connectable.disconnect();
+  }
+
+  private void updateTable(String databaseName, TableInfo table) {
+    DatabaseWrapper wrapper = databases.get(databaseName);
+    if (wrapper != null) {
+      DatabaseInfo info = wrapper.getDatabase();
+      info.getTables().add(table);
+    }
+  }
+
+  public static Props props(ViewContext context) {
+    ConnectionConfig config = ConnectionFactory.create(context);
+    Connectable connectable = new HiveConnectionWrapper(config.getJdbcUrl(), config.getUsername(), config.getPassword(), new AuthParams(context));
+    return Props.create(DatabaseManager.class, config.getUsername(), connectable);
+  }
+
+  public static class Refresh {
+    private final String username;
+
+    public Refresh(String username) {
+      this.username = username;
+    }
+
+    public String getUsername() {
+      return username;
+    }
+  }
+
+  private static class SelfRefresh {
+  }
+
+  private class DatabaseWrapper {
+    private final DatabaseInfo database;
+    private final ActorRef databaseNotifier;
+
+    private DatabaseWrapper(DatabaseInfo database) {
+      this.database = database;
+      databaseNotifier = getContext().actorOf(DatabaseChangeNotifier.props());
+    }
+
+    public DatabaseInfo getDatabase() {
+      return database;
+    }
+
+    public ActorRef getDatabaseNotifier() {
+      return databaseNotifier;
+    }
+  }
+
+  public static class GetDatabases {
+    private final String username;
+
+    public GetDatabases(String username) {
+      this.username = username;
+    }
+
+    public String getUsername() {
+      return username;
+    }
+  }
+
+  public static class DatabasesResult {
+    private final Set<DatabaseInfo> databases;
+
+    public DatabasesResult(Set<DatabaseInfo> databases) {
+      this.databases = databases;
+    }
+
+    public Set<DatabaseInfo> getDatabases() {
+      return databases;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DeathWatch.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DeathWatch.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DeathWatch.java
new file mode 100644
index 0000000..58cefcd
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DeathWatch.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.Terminated;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.actor.message.RegisterActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+
+public class DeathWatch extends HiveActor {
+
+    private final static Logger LOG =
+            LoggerFactory.getLogger(DeathWatch.class);
+
+    @Override
+    public void handleMessage(HiveMessage hiveMessage) {
+        Object message = hiveMessage.getMessage();
+        if(message instanceof RegisterActor){
+            RegisterActor registerActor = (RegisterActor) message;
+            ActorRef actorRef = registerActor.getActorRef();
+            this.getContext().watch(actorRef);
+            LOG.info("Registered new actor "+ actorRef);
+            LOG.info("Registration for {} at {}", actorRef,new Date());
+        }
+        if(message instanceof Terminated){
+            Terminated terminated = (Terminated) message;
+            ActorRef actor = terminated.actor();
+            LOG.info("Received terminate for actor "+ actor);
+            LOG.info("Termination for {} at {}", actor,new Date());
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/HiveActor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/HiveActor.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/HiveActor.java
new file mode 100644
index 0000000..384b798
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/HiveActor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.UntypedActor;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class HiveActor extends UntypedActor {
+
+    private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+    @Override
+    final public void onReceive(Object message) throws Exception {
+        HiveMessage hiveMessage = new HiveMessage(message);
+        if(LOG.isDebugEnabled()){
+            LOG.debug("Received message: " + message.getClass().getName() + ", generated id: " + hiveMessage.getId() +
+                    " sent by: " + sender() + ", recieved by" + self());
+        }
+
+        handleMessage(hiveMessage);
+
+        if(LOG.isDebugEnabled()){
+            LOG.debug("Message submitted: " + hiveMessage.getId());
+
+        }
+    }
+
+    public abstract void handleMessage(HiveMessage hiveMessage);
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
new file mode 100644
index 0000000..ce58c8c
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive20.AuthParams;
+import org.apache.ambari.view.hive20.ConnectionDelegate;
+import org.apache.ambari.view.hive20.actor.message.Connect;
+import org.apache.ambari.view.hive20.actor.message.FetchError;
+import org.apache.ambari.view.hive20.actor.message.FetchResult;
+import org.apache.ambari.view.hive20.actor.message.GetColumnMetadataJob;
+import org.apache.ambari.view.hive20.actor.message.HiveJob;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.actor.message.ResultInformation;
+import org.apache.ambari.view.hive20.actor.message.ResultNotReady;
+import org.apache.ambari.view.hive20.actor.message.RunStatement;
+import org.apache.ambari.view.hive20.actor.message.SQLStatementJob;
+import org.apache.ambari.view.hive20.actor.message.job.CancelJob;
+import org.apache.ambari.view.hive20.actor.message.job.ExecuteNextStatement;
+import org.apache.ambari.view.hive20.actor.message.job.ExecutionFailed;
+import org.apache.ambari.view.hive20.actor.message.job.Failure;
+import org.apache.ambari.view.hive20.actor.message.job.NoResult;
+import org.apache.ambari.view.hive20.actor.message.job.ResultSetHolder;
+import org.apache.ambari.view.hive20.actor.message.job.SaveDagInformation;
+import org.apache.ambari.view.hive20.actor.message.job.SaveGuidToDB;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.CleanUp;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.DestroyConnector;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.FreeConnector;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.InactivityCheck;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.KeepAlive;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.TerminateInactivityCheck;
+import org.apache.ambari.view.hive20.internal.Connectable;
+import org.apache.ambari.view.hive20.internal.ConnectionException;
+import org.apache.ambari.view.hive20.persistence.Storage;
+import org.apache.ambari.view.hive20.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobImpl;
+import org.apache.ambari.view.hive20.utils.HiveActorConfiguration;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.hive.jdbc.HiveConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Wraps one Jdbc connection per user, per instance. This is used to delegate execute the statements and
+ * creates child actors to delegate the ResultSet extraction, YARN/ATS querying for ExecuteJob info and Log Aggregation
+ */
+public class JdbcConnector extends HiveActor {
+
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  public static final String SUFFIX = "validating the login";
+
+  /**
+   * Interval for maximum inactivity allowed
+   */
+  private final static long MAX_INACTIVITY_INTERVAL = 5 * 60 * 1000;
+
+  /**
+   * Interval for maximum inactivity allowed before termination
+   */
+  private static final long MAX_TERMINATION_INACTIVITY_INTERVAL = 10 * 60 * 1000;
+
+  private static final long MILLIS_IN_SECOND = 1000L;
+
+  private final Storage storage;
+
+  /**
+   * Keeps track of the timestamp when the last activity has happened. This is
+   * used to calculate the inactivity period and take lifecycle decisions based
+   * on it.
+   */
+  private long lastActivityTimestamp;
+
+  /**
+   * Akka scheduler to tick at an interval to deal with inactivity of this actor
+   */
+  private Cancellable inactivityScheduler;
+
+  /**
+   * Akka scheduler to tick at an interval to deal with the inactivity after which
+   * the actor should be killed and connection should be released
+   */
+  private Cancellable terminateActorScheduler;
+
+  private Connectable connectable = null;
+  private final ActorRef deathWatch;
+  private final ConnectionDelegate connectionDelegate;
+  private final ActorRef parent;
+  private ActorRef statementExecutor = null;
+  private final HdfsApi hdfsApi;
+  private final AuthParams authParams;
+
+  /**
+   * true if the actor is currently executing any job.
+   */
+  private boolean executing = false;
+  private HiveJob.Type executionType = HiveJob.Type.SYNC;
+
+  /**
+   * Returns the timeout configurations.
+   */
+  private final HiveActorConfiguration actorConfiguration;
+  private String username;
+  private Optional<String> jobId = Optional.absent();
+  private Optional<String> logFile = Optional.absent();
+  private int statementsCount = 0;
+
+  private ActorRef commandSender = null;
+
+  private ActorRef resultSetIterator = null;
+  private boolean isFailure = false;
+  private Failure failure = null;
+  private boolean isCancelCalled = false;
+
+  /**
+   * For every execution, this will hold the statements that are left to execute
+   */
+  private Queue<String> statementQueue = new ArrayDeque<>();
+
+  public JdbcConnector(ViewContext viewContext, ActorRef parent, ActorRef deathWatch, HdfsApi hdfsApi,
+                       ConnectionDelegate connectionDelegate, Storage storage) {
+    this.hdfsApi = hdfsApi;
+    this.parent = parent;
+    this.deathWatch = deathWatch;
+    this.connectionDelegate = connectionDelegate;
+    this.storage = storage;
+    this.lastActivityTimestamp = System.currentTimeMillis();
+    resultSetIterator = null;
+
+    authParams = new AuthParams(viewContext);
+    actorConfiguration = new HiveActorConfiguration(viewContext);
+  }
+
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+    Object message = hiveMessage.getMessage();
+    if (message instanceof InactivityCheck) {
+      checkInactivity();
+    } else if (message instanceof TerminateInactivityCheck) {
+      checkTerminationInactivity();
+    } else if (message instanceof KeepAlive) {
+      keepAlive();
+    } else if (message instanceof CleanUp) {
+      cleanUp();
+    } else {
+      handleNonLifecycleMessage(hiveMessage);
+    }
+  }
+
+  private void handleNonLifecycleMessage(HiveMessage hiveMessage) {
+    Object message = hiveMessage.getMessage();
+    keepAlive();
+    if (message instanceof Connect) {
+      connect((Connect) message);
+    } else if (message instanceof SQLStatementJob) {
+      runStatementJob((SQLStatementJob) message);
+    } else if (message instanceof GetColumnMetadataJob) {
+      runGetMetaData((GetColumnMetadataJob) message);
+    } else if (message instanceof ExecuteNextStatement) {
+      executeNextStatement();
+    } else if (message instanceof ResultInformation) {
+      gotResultBack((ResultInformation) message);
+    } else if (message instanceof CancelJob) {
+      cancelJob((CancelJob) message);
+    } else if (message instanceof FetchResult) {
+      fetchResult((FetchResult) message);
+    } else if (message instanceof FetchError) {
+      fetchError((FetchError) message);
+    } else if (message instanceof SaveGuidToDB) {
+      saveGuid((SaveGuidToDB) message);
+    } else if (message instanceof SaveDagInformation) {
+      saveDagInformation((SaveDagInformation) message);
+    } else {
+      unhandled(message);
+    }
+  }
+
+  private void fetchError(FetchError message) {
+    if (isFailure) {
+      sender().tell(Optional.of(failure), self());
+      return;
+    }
+    sender().tell(Optional.absent(), self());
+  }
+
+  private void fetchResult(FetchResult message) {
+    if (isFailure) {
+      sender().tell(failure, self());
+      return;
+    }
+
+    if (executing) {
+      sender().tell(new ResultNotReady(jobId.get(), username), self());
+      return;
+    }
+    sender().tell(Optional.fromNullable(resultSetIterator), self());
+  }
+
+  private void cancelJob(CancelJob message) {
+    if (!executing || connectionDelegate == null) {
+      LOG.error("Cannot cancel job for user as currently the job is not running or started. JobId: {}", message.getJobId());
+      return;
+    }
+    LOG.info("Cancelling job for user. JobId: {}, user: {}", message.getJobId(), username);
+    try {
+      isCancelCalled = true;
+      connectionDelegate.cancel();
+    } catch (SQLException e) {
+      LOG.error("Failed to cancel job. JobId: {}. {}", message.getJobId(), e);
+    }
+  }
+
+  private void gotResultBack(ResultInformation message) {
+    Optional<Failure> failureOptional = message.getFailure();
+    if (failureOptional.isPresent()) {
+      Failure failure = failureOptional.get();
+      processFailure(failure);
+      return;
+    }
+    if (statementQueue.size() == 0) {
+      // This is the last resultSet
+      processResult(message.getResultSet());
+    } else {
+      self().tell(new ExecuteNextStatement(), self());
+    }
+  }
+
+  private void processCancel() {
+    executing = false;
+    if (isAsync() && jobId.isPresent()) {
+      LOG.error("Job canceled by user for JobId: {}", jobId.get());
+      updateJobStatus(jobId.get(), Job.JOB_STATE_CANCELED);
+    }
+  }
+
+  private void processFailure(Failure failure) {
+    executing = false;
+    isFailure = true;
+    this.failure = failure;
+    if (isAsync() && jobId.isPresent()) {
+      if(isCancelCalled) {
+        processCancel();
+        return;
+      }
+      updateJobStatus(jobId.get(), Job.JOB_STATE_ERROR);
+    } else {
+      // Send for sync execution
+      commandSender.tell(new ExecutionFailed(failure.getMessage(), failure.getError()), self());
+      cleanUpWithTermination();
+    }
+  }
+
+  private void processResult(Optional<ResultSet> resultSetOptional) {
+    executing = false;
+
+    LOG.info("Finished processing SQL statements for Job id : {}", jobId.or("SYNC JOB"));
+    if (isAsync() && jobId.isPresent()) {
+      updateJobStatus(jobId.get(), Job.JOB_STATE_FINISHED);
+    }
+
+    if (resultSetOptional.isPresent()) {
+      ActorRef resultSetActor = getContext().actorOf(Props.create(ResultSetIterator.class, self(),
+        resultSetOptional.get(), isAsync()).withDispatcher("akka.actor.result-dispatcher"),
+        "ResultSetIterator:" + UUID.randomUUID().toString());
+      resultSetIterator = resultSetActor;
+      if (!isAsync()) {
+        commandSender.tell(new ResultSetHolder(resultSetActor), self());
+      }
+    } else {
+      resultSetIterator = null;
+      if (!isAsync()) {
+        commandSender.tell(new NoResult(), self());
+      }
+    }
+  }
+
+  private void executeNextStatement() {
+    if (statementQueue.isEmpty()) {
+      jobExecutionCompleted();
+      return;
+    }
+
+    int index = statementsCount - statementQueue.size();
+    String statement = statementQueue.poll();
+    if (statementExecutor == null) {
+      statementExecutor = getStatementExecutor();
+    }
+
+    if (isAsync()) {
+      statementExecutor.tell(new RunStatement(index, statement, jobId.get(), true, logFile.get(), true), self());
+    } else {
+      statementExecutor.tell(new RunStatement(index, statement), self());
+    }
+  }
+
+  private void runStatementJob(SQLStatementJob message) {
+    executing = true;
+    jobId = message.getJobId();
+    logFile = message.getLogFile();
+    executionType = message.getType();
+    commandSender = getSender();
+
+    resetToInitialState();
+
+    if (!checkConnection()) return;
+
+    for (String statement : message.getStatements()) {
+      statementQueue.add(statement);
+    }
+    statementsCount = statementQueue.size();
+
+    if (isAsync() && jobId.isPresent()) {
+      updateJobStatus(jobId.get(), Job.JOB_STATE_RUNNING);
+      startInactivityScheduler();
+    }
+    self().tell(new ExecuteNextStatement(), self());
+  }
+
+  public boolean checkConnection() {
+    if (connectable == null) {
+      notifyConnectFailure(new SQLException("Hive connection is not created"));
+      return false;
+    }
+
+    Optional<HiveConnection> connectionOptional = connectable.getConnection();
+    if (!connectionOptional.isPresent()) {
+      SQLException sqlException = connectable.isUnauthorized() ? new SQLException("Hive Connection not Authorized", "AUTHFAIL")
+              : new SQLException("Hive connection is not created");
+      notifyConnectFailure(sqlException);
+      return false;
+    }
+    return true;
+  }
+
+  private void runGetMetaData(GetColumnMetadataJob message) {
+    if (!checkConnection()) return;
+    resetToInitialState();
+    executing = true;
+    executionType = message.getType();
+    commandSender = getSender();
+    statementExecutor = getStatementExecutor();
+    statementExecutor.tell(message, self());
+  }
+
+  private ActorRef getStatementExecutor() {
+    return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage, connectable.getConnection().get(), connectionDelegate)
+      .withDispatcher("akka.actor.result-dispatcher"),
+      "StatementExecutor:" + UUID.randomUUID().toString());
+  }
+
+  private boolean isAsync() {
+    return executionType == HiveJob.Type.ASYNC;
+  }
+
+  private void notifyConnectFailure(Exception ex) {
+    executing = false;
+    isFailure = true;
+    this.failure = new Failure("Cannot connect to hive", ex);
+    if (isAsync()) {
+      updateJobStatus(jobId.get(), Job.JOB_STATE_ERROR);
+
+      if(ex instanceof ConnectionException){
+        ConnectionException connectionException = (ConnectionException) ex;
+        Throwable cause = connectionException.getCause();
+        if(cause instanceof SQLException){
+          SQLException sqlException = (SQLException) cause;
+          if(isLoginError(sqlException))
+            return;
+        }
+      }
+
+    } else {
+      sender().tell(new ExecutionFailed("Cannot connect to hive"), ActorRef.noSender());
+    }
+    // Do not clean up in case of failed authorizations
+    // The failure is bubbled to the user for requesting credentials
+
+    if (!(ex instanceof SQLException) || !((SQLException) ex).getSQLState().equals("AUTHFAIL")) {
+      cleanUpWithTermination();
+    }
+  }
+
+  private boolean isLoginError(SQLException ce) {
+    return ce.getCause().getMessage().toLowerCase().endsWith(SUFFIX);
+  }
+
+  private void keepAlive() {
+    lastActivityTimestamp = System.currentTimeMillis();
+  }
+
+  private void jobExecutionCompleted() {
+    // Set is executing as false so that the inactivity checks can finish cleanup
+    // after timeout
+    LOG.info("Job execution completed for user: {}. Results are ready to be fetched", username);
+    this.executing = false;
+  }
+
+  protected Optional<String> getUsername() {
+    return Optional.fromNullable(username);
+  }
+
+  private void connect(Connect message) {
+    username = message.getUsername();
+    jobId = message.getJobId();
+    executionType = message.getType();
+    // check the connectable
+    if (connectable == null) {
+      connectable = message.getConnectable(authParams);
+    }
+    // make the connectable to Hive
+    try {
+      if (!connectable.isOpen()) {
+        connectable.connect();
+      }
+    } catch (ConnectionException e) {
+      LOG.error("Failed to create a hive connection. {}", e);
+      // set up job failure
+      // notify parent about job failure
+      notifyConnectFailure(e);
+      return;
+    }
+    startTerminateInactivityScheduler();
+  }
+
+  private void updateJobStatus(String jobid, final String status) {
+    new JobSaver(jobid) {
+      @Override
+      protected void update(JobImpl job) {
+        job.setStatus(status);
+        job.setDuration(getUpdatedDuration(job.getDateSubmitted()));
+      }
+    }.save();
+    LOG.info("Stored job status for Job id: {} as '{}'", jobid, status);
+  }
+
+  private void saveGuid(final SaveGuidToDB message) {
+    new JobSaver(message.getJobId()) {
+      @Override
+      protected void update(JobImpl job) {
+        job.setGuid(message.getGuid());
+      }
+    }.save();
+    LOG.info("Stored GUID for Job id: {} as '{}'", message.getJobId(), message.getGuid());
+  }
+
+  private void saveDagInformation(final SaveDagInformation message) {
+    if(message.getDagId() == null &&
+        message.getDagName() == null &&
+        message.getApplicationId() == null) {
+      LOG.error("Cannot save Dag Information for job Id: {} as all the properties are null.", message.getJobId());
+      return;
+    }
+    new JobSaver(message.getJobId()) {
+
+      @Override
+      protected void update(JobImpl job) {
+        if (message.getApplicationId() != null) {
+          job.setApplicationId(message.getApplicationId());
+        }
+        if (message.getDagId() != null) {
+          job.setDagId(message.getDagId());
+        }
+        if(message.getDagName() != null) {
+          job.setDagName(message.getDagName());
+        }
+      }
+    }.save();
+    LOG.info("Store Dag Information for job. Job id: {}, dagName: {}, dagId: {}, applicationId: {}", message.getJobId(), message.getDagName(), message.getDagId(), message.getApplicationId());
+  }
+
+  private Long getUpdatedDuration(Long dateSubmitted) {
+    return (System.currentTimeMillis() / MILLIS_IN_SECOND) - (dateSubmitted / MILLIS_IN_SECOND);
+  }
+
+
+  private void checkInactivity() {
+    LOG.debug("Inactivity check, executing status: {}", executing);
+    if (executing) {
+      keepAlive();
+      return;
+    }
+    long current = System.currentTimeMillis();
+    if ((current - lastActivityTimestamp) > actorConfiguration.getInactivityTimeout(MAX_INACTIVITY_INTERVAL)) {
+      // Stop all the sub-actors created
+      cleanUp();
+    }
+  }
+
+  private void checkTerminationInactivity() {
+    if (!isAsync()) {
+      // Should not terminate if job is sync. Will terminate after the job is finished.
+      stopTerminateInactivityScheduler();
+      return;
+    }
+
+    LOG.debug("Termination check, executing status: {}", executing);
+    if (executing) {
+      keepAlive();
+      return;
+    }
+
+    long current = System.currentTimeMillis();
+    if ((current - lastActivityTimestamp) > actorConfiguration.getTerminationTimeout(MAX_TERMINATION_INACTIVITY_INTERVAL)) {
+      cleanUpWithTermination();
+    }
+  }
+
+  private void cleanUp() {
+    if (jobId.isPresent()) {
+      LOG.debug("{} :: Cleaning up resources for inactivity for jobId: {}", self().path().name(), jobId.get());
+    } else {
+      LOG.debug("{} ::Cleaning up resources with inactivity for Sync execution.", self().path().name());
+    }
+    this.executing = false;
+    cleanUpStatementAndResultSet();
+    stopInactivityScheduler();
+    parent.tell(new FreeConnector(username, jobId.orNull(), isAsync()), self());
+  }
+
+  private void cleanUpWithTermination() {
+    this.executing = false;
+    LOG.debug("{} :: Cleaning up resources with inactivity for execution.", self().path().name());
+    cleanUpStatementAndResultSet();
+
+    stopInactivityScheduler();
+    stopTerminateInactivityScheduler();
+    parent.tell(new DestroyConnector(username, jobId.orNull(), isAsync()), this.self());
+    self().tell(PoisonPill.getInstance(), ActorRef.noSender());
+  }
+
+
+  private void cleanUpStatementAndResultSet() {
+    connectionDelegate.closeStatement();
+    connectionDelegate.closeResultSet();
+  }
+
+  private void startTerminateInactivityScheduler() {
+    this.terminateActorScheduler = getContext().system().scheduler().schedule(
+      Duration.Zero(), Duration.create(60 * 1000, TimeUnit.MILLISECONDS),
+      this.getSelf(), new TerminateInactivityCheck(), getContext().dispatcher(), null);
+  }
+
+  private void stopTerminateInactivityScheduler() {
+    if (!(terminateActorScheduler == null || terminateActorScheduler.isCancelled())) {
+      terminateActorScheduler.cancel();
+    }
+  }
+
+  private void startInactivityScheduler() {
+    if (inactivityScheduler != null) {
+      inactivityScheduler.cancel();
+    }
+    inactivityScheduler = getContext().system().scheduler().schedule(
+      Duration.Zero(), Duration.create(15 * 1000, TimeUnit.MILLISECONDS),
+      this.self(), new InactivityCheck(), getContext().dispatcher(), null);
+  }
+
+  private void stopInactivityScheduler() {
+    if (!(inactivityScheduler == null || inactivityScheduler.isCancelled())) {
+      inactivityScheduler.cancel();
+    }
+  }
+
+  private void resetToInitialState() {
+    isFailure = false;
+    failure = null;
+    resultSetIterator = null;
+    isCancelCalled = false;
+    statementQueue = new ArrayDeque<>();
+  }
+
+  @Override
+  public void postStop() throws Exception {
+    stopInactivityScheduler();
+    stopTerminateInactivityScheduler();
+
+    if (connectable.isOpen()) {
+      connectable.disconnect();
+    }
+  }
+
+  /**
+   * Saves the job to database.
+   */
+  private abstract class JobSaver {
+    private final String jobId;
+
+    JobSaver(String jobId) {
+      this.jobId = jobId;
+    }
+
+    public void save() {
+      try {
+        JobImpl job = storage.load(JobImpl.class, jobId);
+        update(job);
+        storage.store(JobImpl.class, job);
+      } catch (ItemNotFound itemNotFound) {
+        itemNotFound(jobId);
+      }
+    }
+
+    /**
+     * Override to handle Not found exception
+     */
+    private void itemNotFound(String jobId) {
+      // Nothing to do
+    }
+
+    protected abstract void update(JobImpl job);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
new file mode 100644
index 0000000..f9c21b4
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import com.google.common.base.Joiner;
+import org.apache.ambari.view.hive20.actor.message.GetMoreLogs;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.actor.message.LogAggregationFinished;
+import org.apache.ambari.view.hive20.actor.message.StartLogAggregation;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.ambari.view.utils.hdfs.HdfsApiException;
+import org.apache.ambari.view.utils.hdfs.HdfsUtil;
+import org.apache.hive.jdbc.HiveStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Reads the logs for a ExecuteJob from the Statement and writes them into hdfs.
+ */
+public class LogAggregator extends HiveActor {
+
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  public static final int AGGREGATION_INTERVAL = 5 * 1000;
+  private final HdfsApi hdfsApi;
+  private final HiveStatement statement;
+  private final String logFile;
+
+  private Cancellable moreLogsScheduler;
+  private ActorRef parent;
+  private boolean hasStartedFetching = false;
+  private boolean shouldFetchMore = true;
+
+  public LogAggregator(HdfsApi hdfsApi, HiveStatement statement, String logFile) {
+    this.hdfsApi = hdfsApi;
+    this.statement = statement;
+    this.logFile = logFile;
+  }
+
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+    Object message = hiveMessage.getMessage();
+    if (message instanceof StartLogAggregation) {
+      start();
+    }
+
+    if (message instanceof GetMoreLogs) {
+      try {
+        getMoreLogs();
+      } catch (SQLException e) {
+        LOG.error("SQL Error while getting logs. Tried writing to: {}", logFile);
+      } catch (HdfsApiException e) {
+        LOG.warn("HDFS Error while getting writing logs to {}", logFile);
+
+      }
+    }
+  }
+
+  private void start() {
+    parent = this.getSender();
+    hasStartedFetching = false;
+    shouldFetchMore = true;
+    if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) {
+      moreLogsScheduler.cancel();
+    }
+    this.moreLogsScheduler = getContext().system().scheduler().schedule(
+      Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, TimeUnit.MILLISECONDS),
+      this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null);
+  }
+
+  private void getMoreLogs() throws SQLException, HdfsApiException {
+    List<String> logs = statement.getQueryLog();
+    if (logs.size() > 0 && shouldFetchMore) {
+      String allLogs = Joiner.on("\n").skipNulls().join(logs);
+      HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
+      if(!statement.hasMoreLogs()) {
+        shouldFetchMore = false;
+      }
+    } else {
+      // Cancel the timer only when log fetching has been started
+      if(!shouldFetchMore) {
+        moreLogsScheduler.cancel();
+        parent.tell(new LogAggregationFinished(), ActorRef.noSender());
+      }
+    }
+  }
+
+  @Override
+  public void postStop() throws Exception {
+    if (moreLogsScheduler != null && !moreLogsScheduler.isCancelled()) {
+      moreLogsScheduler.cancel();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataManager.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataManager.java
new file mode 100644
index 0000000..d63b3a0
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataManager.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Props;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.actor.message.Ping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manages the Meta Information for Hive Server. Singleton actor which stores several DatabaseManagerActor in memory for
+ * each user and instance name combination.
+ */
+public class MetaDataManager extends HiveActor {
+
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  /**
+   * Stores the sub database manager actors per user combination
+   */
+  private final Map<String, ActorRef> databaseManagers = new HashMap<>();
+  private final Map<String, Cancellable> terminationSchedulers = new HashMap<>();
+  private final ViewContext context;
+
+  public MetaDataManager(ViewContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+
+    Object message = hiveMessage.getMessage();
+    if (message instanceof Ping) {
+      handlePing((Ping) message);
+    } else if (message instanceof Terminate) {
+      handleTerminate((Terminate) message);
+    } else if (message instanceof DatabaseManager.GetDatabases) {
+      handleGetDatabases((DatabaseManager.GetDatabases) message);
+    }
+  }
+
+  private void handlePing(Ping message) {
+    LOG.info("Ping message received for user: {}, instance: {}", message.getUsername(), message.getInstanceName());
+    ActorRef databaseManager = databaseManagers.get(message.getUsername());
+    if (databaseManager == null) {
+      databaseManager = createDatabaseManager(message.getUsername(), message.getInstanceName());
+      databaseManagers.put(context.getUsername(), databaseManager);
+      databaseManager.tell(new DatabaseManager.Refresh(context.getUsername()), getSelf());
+    } else {
+      cancelTerminationScheduler(message.getUsername());
+    }
+    scheduleTermination(context.getUsername());
+  }
+
+  private void handleTerminate(Terminate message) {
+    ActorRef databaseManager = databaseManagers.remove(message.username);
+    getContext().stop(databaseManager);
+    cancelTerminationScheduler(message.getUsername());
+  }
+
+  private void handleGetDatabases(DatabaseManager.GetDatabases message) {
+    String username = message.getUsername();
+    ActorRef databaseManager = databaseManagers.get(username);
+    if(databaseManager != null) {
+      databaseManager.tell(message, getSender());
+    } else {
+      // Not database Manager created. Start the database manager with a ping message
+      // and queue up the GetDatabases call to self
+      getSelf().tell(new Ping(username, context.getInstanceName()), getSender());
+      getSelf().tell(message, getSender());
+    }
+  }
+
+  private void cancelTerminationScheduler(String username) {
+    Cancellable cancellable = terminationSchedulers.remove(username);
+    if (!(cancellable == null || cancellable.isCancelled())) {
+      LOG.info("Cancelling termination scheduler");
+      cancellable.cancel();
+    }
+  }
+
+  private void scheduleTermination(String username) {
+    Cancellable cancellable = context().system().scheduler().scheduleOnce(Duration.create(2, TimeUnit.MINUTES),
+        getSelf(), new Terminate(username), getContext().dispatcher(), getSelf());
+    terminationSchedulers.put(username, cancellable);
+  }
+
+  private ActorRef createDatabaseManager(String username, String instanceName) {
+    LOG.info("Creating database manager for username: {}, instance: {}", username, instanceName);
+    return context().actorOf(DatabaseManager.props(context));
+  }
+
+  public static Props props(ViewContext viewContext) {
+    return Props.create(MetaDataManager.class, viewContext);
+  }
+
+  private class Terminate {
+    public final String username;
+
+    public Terminate(String username) {
+      this.username = username;
+    }
+
+    public String getUsername() {
+      return username;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataRetriever.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataRetriever.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataRetriever.java
new file mode 100644
index 0000000..7323a0a
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataRetriever.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.internal.Connectable;
+import org.apache.ambari.view.hive20.internal.ConnectionException;
+import org.apache.ambari.view.hive20.internal.dto.DatabaseInfo;
+import org.apache.ambari.view.hive20.internal.dto.TableInfo;
+import org.apache.hive.jdbc.HiveConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *
+ */
+public class MetaDataRetriever extends HiveActor {
+
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  private final Connectable connectable;
+
+  public MetaDataRetriever(Connectable connectable) {
+    this.connectable = connectable;
+  }
+
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+    Object message = hiveMessage.getMessage();
+    if (message instanceof RefreshDB) {
+      handleRefreshDB();
+    }
+  }
+
+  private void handleRefreshDB() {
+    try {
+      refreshDatabaseInfos();
+    } catch (ConnectionException | SQLException e) {
+      LOG.error("Failed to update the complete database information. Exception: {}", e);
+      getSender().tell(new DBRefreshFailed(e), getSelf());
+    }
+  }
+
+  private HiveConnection getHiveConnection() throws ConnectionException {
+    if (!connectable.isOpen()) {
+      connectable.connect();
+    }
+    Optional<HiveConnection> connectionOptional = connectable.getConnection();
+    return connectionOptional.get();
+  }
+
+  private void refreshDatabaseInfos() throws ConnectionException, SQLException {
+    HiveConnection connection = getHiveConnection();
+    Set<DatabaseInfo> infos = new HashSet<>();
+    try (ResultSet schemas = connection.getMetaData().getSchemas()) {
+      while (schemas.next()) {
+        DatabaseInfo info = new DatabaseInfo(schemas.getString(1));
+        infos.add(info);
+      }
+    }
+
+    getSender().tell(new DBRefreshed(infos), getSelf());
+
+    for (DatabaseInfo info : infos) {
+      refreshTablesInfo(info.getName());
+    }
+  }
+
+  private void refreshTablesInfo(String database) throws ConnectionException, SQLException {
+    HiveConnection connection = getHiveConnection();
+    try (ResultSet tables = connection.getMetaData().getTables("", database, null, null)) {
+      while (tables.next()) {
+        TableInfo info = new TableInfo(tables.getString(3), tables.getString(4));
+        getSender().tell(new TableRefreshed(info, database), getSelf());
+      }
+    }
+    getSender().tell(new AllTableRefreshed(database), getSelf());
+  }
+
+  public static  Props props(Connectable connectable) {
+    return Props.create(MetaDataRetriever.class, connectable);
+  }
+
+
+  public static class RefreshDB {
+
+  }
+
+  public static class DBRefreshed {
+    private final Set<DatabaseInfo> databases;
+
+    public DBRefreshed(Set<DatabaseInfo> databases) {
+      this.databases = databases;
+    }
+
+    public Set<DatabaseInfo> getDatabases() {
+      return databases;
+    }
+  }
+
+  public static class DBRefreshFailed {
+    private final Exception exception;
+
+    public DBRefreshFailed(Exception exception) {
+      this.exception = exception;
+    }
+
+    public Exception getException() {
+      return exception;
+    }
+  }
+
+  public static  class TableRefreshed {
+    private final TableInfo table;
+    private final String database;
+
+    public TableRefreshed(TableInfo table, String database) {
+      this.table = table;
+      this.database = database;
+    }
+
+    public TableInfo getTable() {
+      return table;
+    }
+
+    public String getDatabase() {
+      return database;
+    }
+  }
+
+  public static class AllTableRefreshed {
+    private final String database;
+
+    public AllTableRefreshed(String database) {
+      this.database = database;
+    }
+
+    public String getDatabase() {
+      return database;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/OperationController.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/OperationController.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/OperationController.java
new file mode 100644
index 0000000..f751d8f
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/OperationController.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive20.ConnectionDelegate;
+import org.apache.ambari.view.hive20.actor.message.Connect;
+import org.apache.ambari.view.hive20.actor.message.ExecuteJob;
+import org.apache.ambari.view.hive20.actor.message.FetchError;
+import org.apache.ambari.view.hive20.actor.message.FetchResult;
+import org.apache.ambari.view.hive20.actor.message.HiveJob;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.actor.message.JobRejected;
+import org.apache.ambari.view.hive20.actor.message.RegisterActor;
+import org.apache.ambari.view.hive20.actor.message.SQLStatementJob;
+import org.apache.ambari.view.hive20.actor.message.job.CancelJob;
+import org.apache.ambari.view.hive20.actor.message.job.FetchFailed;
+import org.apache.ambari.view.hive20.actor.message.job.SaveDagInformation;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.DestroyConnector;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.FreeConnector;
+import org.apache.ambari.view.hive20.internal.ContextSupplier;
+import org.apache.ambari.view.hive20.persistence.Storage;
+import org.apache.ambari.view.hive20.utils.LoggingOutputStream;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.collections4.map.HashedMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Router actor to control the operations. This delegates the operations to underlying child actors and
+ * store the state for them.
+ */
+public class OperationController extends HiveActor {
+
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  private final ActorSystem system;
+  private final ActorRef deathWatch;
+  private final ContextSupplier<ConnectionDelegate> connectionSupplier;
+  private final ContextSupplier<Storage> storageSupplier;
+  private final ContextSupplier<Optional<HdfsApi>> hdfsApiSupplier;
+
+  /**
+   * Store the connection per user which are currently not working
+   */
+  private final Map<String, Queue<ActorRef>> asyncAvailableConnections;
+
+  /**
+   * Store the connection per user which are currently not working
+   */
+  private final Map<String, Queue<ActorRef>> syncAvailableConnections;
+
+
+  /**
+   * Store the connection per user/per job which are currently working.
+   */
+  private final Map<String, Map<String, ActorRef>> asyncBusyConnections;
+
+  /**
+   * Store the connection per user which will be used to execute sync jobs
+   * like fetching databases, tables etc.
+   */
+  private final Map<String, Set<ActorRef>> syncBusyConnections;
+
+
+  private final ViewContext context;
+
+  public OperationController(ActorSystem system,
+                             ActorRef deathWatch,
+                             ViewContext context,
+                             ContextSupplier<ConnectionDelegate> connectionSupplier,
+                             ContextSupplier<Storage> storageSupplier,
+                             ContextSupplier<Optional<HdfsApi>> hdfsApiSupplier) {
+    this.system = system;
+    this.deathWatch = deathWatch;
+    this.context = context;
+    this.connectionSupplier = connectionSupplier;
+    this.storageSupplier = storageSupplier;
+    this.hdfsApiSupplier = hdfsApiSupplier;
+    this.asyncAvailableConnections = new HashMap<>();
+    this.syncAvailableConnections = new HashMap<>();
+    this.asyncBusyConnections = new HashedMap<>();
+    this.syncBusyConnections = new HashMap<>();
+  }
+
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+    Object message = hiveMessage.getMessage();
+
+    if (message instanceof ExecuteJob) {
+      ExecuteJob job = (ExecuteJob) message;
+      if (job.getJob().getType() == HiveJob.Type.ASYNC) {
+        sendJob(job.getConnect(), (SQLStatementJob) job.getJob());
+      } else if (job.getJob().getType() == HiveJob.Type.SYNC) {
+        sendSyncJob(job.getConnect(), job.getJob());
+      }
+    }
+
+    if (message instanceof CancelJob) {
+      cancelJob((CancelJob) message);
+    }
+
+    if (message instanceof FetchResult) {
+      fetchResultActorRef((FetchResult) message);
+    }
+
+    if (message instanceof FetchError) {
+      fetchError((FetchError) message);
+    }
+
+    if (message instanceof FreeConnector) {
+      freeConnector((FreeConnector) message);
+    }
+
+    if (message instanceof DestroyConnector) {
+      destroyConnector((DestroyConnector) message);
+    }
+
+    if (message instanceof SaveDagInformation) {
+      saveDagInformation((SaveDagInformation) message);
+    }
+  }
+
+  private void cancelJob(CancelJob message) {
+    String jobId = message.getJobId();
+    String username = message.getUsername();
+    ActorRef actorRef = asyncBusyConnections.get(username).get(jobId);
+    if (actorRef != null) {
+      actorRef.tell(message, sender());
+    } else {
+      String msg = String.format("Cannot cancel job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName());
+      LOG.error(msg);
+      sender().tell(new FetchFailed(msg), self());
+    }
+  }
+
+  private void saveDagInformation(SaveDagInformation message) {
+    ActorRef jdbcConnection = asyncBusyConnections.get(context.getUsername()).get(message.getJobId());
+    if(jdbcConnection != null) {
+      jdbcConnection.tell(message, sender());
+    } else {
+      String msg = String.format("Cannot update Dag Information for job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName());
+      LOG.error(msg);
+    }
+  }
+
+  private void fetchError(FetchError message) {
+    String jobId = message.getJobId();
+    String username = message.getUsername();
+    ActorRef actorRef = asyncBusyConnections.get(username).get(jobId);
+    if (actorRef != null) {
+      actorRef.tell(message, sender());
+    } else {
+      String msg = String.format("Cannot fetch error for job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName());
+      LOG.error(msg);
+      sender().tell(new FetchFailed(msg), self());
+    }
+  }
+
+  private void fetchResultActorRef(FetchResult message) {
+    String username = message.getUsername();
+    String jobId = message.getJobId();
+    ActorRef actorRef = asyncBusyConnections.get(username).get(jobId);
+    if (actorRef != null) {
+      actorRef.tell(message, sender());
+    } else {
+      String msg = String.format("Cannot fetch result for job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName());
+      LOG.error(msg);
+      sender().tell(new FetchFailed(msg), self());
+    }
+  }
+
+  private void sendJob(Connect connect, SQLStatementJob job) {
+    String username = job.getUsername();
+    String jobId = job.getJobId().get();
+    ActorRef subActor = null;
+    // Check if there is available actors to process this
+    subActor = getActorRefFromAsyncPool(username);
+    if (subActor == null) {
+      Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(context);
+      if (!hdfsApiOptional.isPresent()) {
+        sender().tell(new JobRejected(username, jobId, "Failed to connect to Hive."), self());
+        return;
+      }
+      HdfsApi hdfsApi = hdfsApiOptional.get();
+
+      subActor = system.actorOf(
+        Props.create(JdbcConnector.class, context, self(),
+          deathWatch, hdfsApi, connectionSupplier.get(context),
+          storageSupplier.get(context)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
+        UUID.randomUUID().toString() + ":asyncjdbcConnector");
+      deathWatch.tell(new RegisterActor(subActor), self());
+    }
+
+    if (asyncBusyConnections.containsKey(username)) {
+      Map<String, ActorRef> actors = asyncBusyConnections.get(username);
+      if (!actors.containsKey(jobId)) {
+        actors.put(jobId, subActor);
+      } else {
+        // Reject this as with the same jobId one connection is already in progress.
+        sender().tell(new JobRejected(username, jobId, "Existing job in progress with same jobId."), ActorRef.noSender());
+      }
+    } else {
+      Map<String, ActorRef> actors = new HashMap<>();
+      actors.put(jobId, subActor);
+      asyncBusyConnections.put(username, actors);
+    }
+
+    // set up the connect with ExecuteJob id for terminations
+    subActor.tell(connect, self());
+    subActor.tell(job, self());
+
+  }
+
+  private ActorRef getActorRefFromSyncPool(String username) {
+    return getActorRefFromPool(syncAvailableConnections, username);
+  }
+
+  private ActorRef getActorRefFromAsyncPool(String username) {
+    return getActorRefFromPool(asyncAvailableConnections, username);
+  }
+
+  private ActorRef getActorRefFromPool(Map<String, Queue<ActorRef>> pool, String username) {
+    ActorRef subActor = null;
+    if (pool.containsKey(username)) {
+      Queue<ActorRef> availableActors = pool.get(username);
+      if (availableActors.size() != 0) {
+        subActor = availableActors.poll();
+      }
+    } else {
+      pool.put(username, new LinkedList<ActorRef>());
+    }
+    return subActor;
+  }
+
+  private void sendSyncJob(Connect connect, HiveJob job) {
+    String username = job.getUsername();
+    ActorRef subActor = null;
+    // Check if there is available actors to process this
+    subActor = getActorRefFromSyncPool(username);
+
+    if (subActor == null) {
+      Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(context);
+      if (!hdfsApiOptional.isPresent()) {
+        sender().tell(new JobRejected(username, ExecuteJob.SYNC_JOB_MARKER, "Failed to connect to HDFS."), ActorRef.noSender());
+        return;
+      }
+      HdfsApi hdfsApi = hdfsApiOptional.get();
+
+      subActor = system.actorOf(
+        Props.create(JdbcConnector.class, context, self(),
+          deathWatch, hdfsApi, connectionSupplier.get(context),
+          storageSupplier.get(context)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
+        UUID.randomUUID().toString() + ":syncjdbcConnector");
+      deathWatch.tell(new RegisterActor(subActor), self());
+    }
+
+    if (syncBusyConnections.containsKey(username)) {
+      Set<ActorRef> actors = syncBusyConnections.get(username);
+      actors.add(subActor);
+    } else {
+      LinkedHashSet<ActorRef> actors = new LinkedHashSet<>();
+      actors.add(subActor);
+      syncBusyConnections.put(username, actors);
+    }
+
+    // Termination requires that the ref is known in case of sync jobs
+    subActor.tell(connect, sender());
+    subActor.tell(job, sender());
+  }
+
+
+  private void destroyConnector(DestroyConnector message) {
+    ActorRef sender = getSender();
+    if (message.isForAsync()) {
+      removeFromAsyncBusyPool(message.getUsername(), message.getJobId());
+      removeFromASyncAvailable(message.getUsername(), sender);
+    } else {
+      removeFromSyncBusyPool(message.getUsername(), sender);
+      removeFromSyncAvailable(message.getUsername(), sender);
+    }
+    logMaps();
+  }
+
+  private void freeConnector(FreeConnector message) {
+    ActorRef sender = getSender();
+    if (message.isForAsync()) {
+      LOG.info("About to free connector for job {} and user {}", message.getJobId(), message.getUsername());
+      Optional<ActorRef> refOptional = removeFromAsyncBusyPool(message.getUsername(), message.getJobId());
+      if (refOptional.isPresent()) {
+        addToAsyncAvailable(message.getUsername(), refOptional.get());
+      }
+      return;
+    }
+
+    // Was a sync job, remove from sync pool
+    LOG.info("About to free sync connector for user {}", message.getUsername());
+    Optional<ActorRef> refOptional = removeFromSyncBusyPool(message.getUsername(), sender);
+    if (refOptional.isPresent()) {
+      addToSyncAvailable(message.getUsername(), refOptional.get());
+    }
+
+
+    logMaps();
+
+  }
+
+  private void logMaps() {
+    LOG.debug("Pool status");
+    LoggingOutputStream out = new LoggingOutputStream(LOG, LoggingOutputStream.LogLevel.DEBUG);
+    MapUtils.debugPrint(new PrintStream(out), "Busy Async connections", asyncBusyConnections);
+    MapUtils.debugPrint(new PrintStream(out), "Available Async connections", asyncAvailableConnections);
+    MapUtils.debugPrint(new PrintStream(out), "Busy Sync connections", syncBusyConnections);
+    MapUtils.debugPrint(new PrintStream(out), "Available Sync connections", syncAvailableConnections);
+    try {
+      out.close();
+    } catch (IOException e) {
+      LOG.warn("Cannot close Logging output stream, this may lead to leaks");
+    }
+  }
+
+  private Optional<ActorRef> removeFromSyncBusyPool(String userName, ActorRef refToFree) {
+    if (syncBusyConnections.containsKey(userName)) {
+      Set<ActorRef> actorRefs = syncBusyConnections.get(userName);
+      actorRefs.remove(refToFree);
+    }
+    return Optional.of(refToFree);
+  }
+
+  private Optional<ActorRef> removeFromAsyncBusyPool(String username, String jobId) {
+    ActorRef ref = null;
+    if (asyncBusyConnections.containsKey(username)) {
+      Map<String, ActorRef> actors = asyncBusyConnections.get(username);
+      if (actors.containsKey(jobId)) {
+        ref = actors.get(jobId);
+        actors.remove(jobId);
+      }
+    }
+    return Optional.fromNullable(ref);
+  }
+
+  private void addToAsyncAvailable(String username, ActorRef actor) {
+    addToAvailable(asyncAvailableConnections, username, actor);
+  }
+
+  private void addToSyncAvailable(String username, ActorRef actor) {
+    addToAvailable(syncAvailableConnections, username, actor);
+  }
+
+  private void addToAvailable(Map<String, Queue<ActorRef>> pool, String username, ActorRef actor) {
+    if (!pool.containsKey(username)) {
+      pool.put(username, new LinkedList<ActorRef>());
+    }
+
+    Queue<ActorRef> availableActors = pool.get(username);
+    availableActors.add(actor);
+  }
+
+  private void removeFromASyncAvailable(String username, ActorRef sender) {
+    removeFromAvailable(asyncAvailableConnections, username, sender);
+  }
+
+  private void removeFromSyncAvailable(String username, ActorRef sender) {
+    removeFromAvailable(syncAvailableConnections, username, sender);
+  }
+
+  private void removeFromAvailable(Map<String, Queue<ActorRef>> pool, String username, ActorRef sender) {
+    if (!pool.containsKey(username)) {
+      return;
+    }
+    Queue<ActorRef> actors = pool.get(username);
+    actors.remove(sender);
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/ResultSetIterator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/ResultSetIterator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/ResultSetIterator.java
new file mode 100644
index 0000000..4b4a407
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/ResultSetIterator.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import com.google.common.collect.Lists;
+import org.apache.ambari.view.hive20.actor.message.CursorReset;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.actor.message.ResetCursor;
+import org.apache.ambari.view.hive20.actor.message.job.FetchFailed;
+import org.apache.ambari.view.hive20.actor.message.job.Next;
+import org.apache.ambari.view.hive20.actor.message.job.NoMoreItems;
+import org.apache.ambari.view.hive20.actor.message.job.Result;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.CleanUp;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.KeepAlive;
+import org.apache.ambari.view.hive20.client.ColumnDescription;
+import org.apache.ambari.view.hive20.client.ColumnDescriptionShort;
+import org.apache.ambari.view.hive20.client.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
+public class ResultSetIterator extends HiveActor {
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  private static final int DEFAULT_BATCH_SIZE = 100;
+  public static final String NULL = "NULL";
+
+  private final ActorRef parent;
+  private final ResultSet resultSet;
+  private final int batchSize;
+
+  private List<ColumnDescription> columnDescriptions;
+  private int columnCount;
+  boolean async = false;
+  private boolean metaDataFetched = false;
+
+  public ResultSetIterator(ActorRef parent, ResultSet resultSet, int batchSize, boolean isAsync) {
+    this.parent = parent;
+    this.resultSet = resultSet;
+    this.batchSize = batchSize;
+    this.async = isAsync;
+  }
+
+  public ResultSetIterator(ActorRef parent, ResultSet resultSet) {
+    this(parent, resultSet, DEFAULT_BATCH_SIZE, true);
+  }
+
+  public ResultSetIterator(ActorRef parent, ResultSet resultSet, boolean isAsync) {
+    this(parent, resultSet, DEFAULT_BATCH_SIZE, isAsync);
+  }
+
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+    sendKeepAlive();
+    Object message = hiveMessage.getMessage();
+    if (message instanceof Next) {
+      getNext();
+    }
+    if (message instanceof ResetCursor) {
+      resetResultSet();
+    }
+
+    if (message instanceof KeepAlive) {
+      sendKeepAlive();
+    }
+  }
+
+  private void resetResultSet() {
+    try {
+      resultSet.beforeFirst();
+      sender().tell(new CursorReset(), self());
+    } catch (SQLException e) {
+      LOG.error("Failed to reset the cursor", e);
+      sender().tell(new FetchFailed("Failed to reset the cursor", e), self());
+      cleanUpResources();
+    }
+  }
+
+  private void sendKeepAlive() {
+    LOG.debug("Sending a keep alive to {}", parent);
+    parent.tell(new KeepAlive(), self());
+  }
+
+  private void getNext() {
+    List<Row> rows = Lists.newArrayList();
+    if (!metaDataFetched) {
+      try {
+        initialize();
+      } catch (SQLException ex) {
+        LOG.error("Failed to fetch metadata for the ResultSet", ex);
+        sender().tell(new FetchFailed("Failed to get metadata for ResultSet", ex), self());
+        cleanUpResources();
+      }
+    }
+    int index = 0;
+    try {
+      while (resultSet.next() && index < batchSize) {
+        index++;
+        rows.add(getRowFromResultSet(resultSet));
+      }
+
+      if (index == 0) {
+        // We have hit end of resultSet
+        sender().tell(new NoMoreItems(), self());
+        if(!async) {
+          cleanUpResources();
+        }
+      } else {
+        Result result = new Result(rows, columnDescriptions);
+        sender().tell(result, self());
+      }
+
+    } catch (SQLException ex) {
+      LOG.error("Failed to fetch next batch for the Resultset", ex);
+      sender().tell(new FetchFailed("Failed to fetch next batch for the Resultset", ex), self());
+      cleanUpResources();
+    }
+  }
+
+  private void cleanUpResources() {
+    parent.tell(new CleanUp(), self());
+  }
+
+  private Row getRowFromResultSet(ResultSet resultSet) throws SQLException {
+    Object[] values = new Object[columnCount];
+    for (int i = 0; i < columnCount; i++) {
+      values[i] = resultSet.getObject(i + 1);
+    }
+    return new Row(values);
+  }
+
+  private void initialize() throws SQLException {
+    metaDataFetched = true;
+    ResultSetMetaData metaData = resultSet.getMetaData();
+    columnCount = metaData.getColumnCount();
+    columnDescriptions = Lists.newArrayList();
+    for (int i = 1; i <= columnCount; i++) {
+      String columnName = metaData.getColumnName(i);
+      String typeName = metaData.getColumnTypeName(i);
+      ColumnDescription description = new ColumnDescriptionShort(columnName, typeName, i);
+      columnDescriptions.add(description);
+    }
+  }
+}