You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/01/05 00:05:00 UTC
[19/50] [abbrv] ambari git commit: AMBARI-19321 : Hive View 2.0 -
Minimal view for Hive which includes new UI changes. Also made changes in
poms as required (nitirajrathore)
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseChangeNotifier.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseChangeNotifier.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseChangeNotifier.java
new file mode 100644
index 0000000..37f24d2
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseChangeNotifier.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.collect.Sets;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.internal.dto.TableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ *
+ */
+public class DatabaseChangeNotifier extends HiveActor {
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private String currentDatabaseName;
+ private Map<String, TableWrapper> tables = new HashMap<>();
+ private Map<String, TableInfo> newTables = new HashMap<>();
+
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+ Object message = hiveMessage.getMessage();
+ if(message instanceof DatabaseAdded) {
+ handleDatabaseAdded((DatabaseAdded) message);
+ } else if ( message instanceof DatabaseRemoved) {
+ handleDatabaseRemoved((DatabaseRemoved) message);
+ } else if (message instanceof TableUpdated) {
+ handleTableUpdated((TableUpdated) message);
+ } else if (message instanceof AllTablesUpdated) {
+ handleAllTableUpdated((AllTablesUpdated) message);
+ }
+ }
+
+ private void handleDatabaseAdded(DatabaseAdded message) {
+ LOG.info("Database Added: {}", message.name);
+ currentDatabaseName = message.name;
+ // TODO: Send event to eventbus
+ }
+
+ private void handleDatabaseRemoved(DatabaseRemoved message) {
+ LOG.info("Database Removed: {}", message.name);
+ // TODO: Send event to eventbus
+ }
+
+ private void handleTableUpdated(TableUpdated message) {
+ LOG.info("XXXXX: table xxxx. Size: {}", newTables.size());
+ newTables.put(message.info.getName(), message.info);
+ }
+
+ private void handleAllTableUpdated(AllTablesUpdated message) {
+ Set<String> oldTableNames = new HashSet<>(tables.keySet());
+ Set<String> newTableNames = new HashSet<>(newTables.keySet());
+
+ Set<String> tablesAdded = Sets.difference(newTableNames, oldTableNames);
+ Set<String> tablesRemoved = Sets.difference(oldTableNames, newTableNames);
+ Set<String> tablesUpdated = Sets.intersection(oldTableNames, newTableNames);
+
+ updateTablesAdded(tablesAdded);
+ updateTablesRemoved(tablesRemoved);
+ updateTablesUpdated(tablesUpdated);
+ newTables.clear();
+ }
+
+ private void updateTablesAdded(Set<String> tablesAdded) {
+ for (String tableName: tablesAdded) {
+ TableWrapper wrapper = new TableWrapper(tableName);
+ tables.put(tableName, wrapper);
+ wrapper.getTableNotifier().tell(new TableChangeNotifier.TableAdded(newTables.get(tableName)), getSelf());
+ }
+ }
+
+ private void updateTablesRemoved(Set<String> tablesRemoved) {
+ for(String tableName: tablesRemoved) {
+ TableWrapper tableWrapper = tables.remove(tableName);
+ tableWrapper.getTableNotifier().tell(new TableChangeNotifier.TableRemoved(tableName), getSelf());
+ tableWrapper.getTableNotifier().tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+
+ private void updateTablesUpdated(Set<String> tablesUpdated) {
+ for(String tableName: tablesUpdated) {
+ TableWrapper tableWrapper = tables.get(tableName);
+ // TODO: Check what needs to be done here.
+ }
+ }
+
+ public static Props props() {
+ return Props.create(DatabaseChangeNotifier.class);
+ }
+
+ public class TableWrapper {
+ private final String tableName;
+ private final ActorRef tableNotifier;
+
+ private TableWrapper(String tableName) {
+ this.tableName = tableName;
+ this.tableNotifier = getContext().actorOf(TableChangeNotifier.props());
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public ActorRef getTableNotifier() {
+ return tableNotifier;
+ }
+ }
+
+ public static class DatabaseAdded {
+ private final String name;
+
+ public DatabaseAdded(String name) {
+ this.name = name;
+ }
+ }
+
+
+ public static class DatabaseRemoved {
+ private final String name;
+
+ public DatabaseRemoved(String name) {
+ this.name = name;
+ }
+ }
+
+ public static class TableUpdated {
+ private final TableInfo info;
+
+ public TableUpdated(TableInfo info) {
+ this.info = info;
+ }
+ }
+
+ public static class AllTablesUpdated {
+ private final String database;
+
+ public AllTablesUpdated(String database) {
+ this.database = database;
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseManager.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseManager.java
new file mode 100644
index 0000000..6dc4ad9
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseManager.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Sets;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive20.AuthParams;
+import org.apache.ambari.view.hive20.ConnectionFactory;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.client.ConnectionConfig;
+import org.apache.ambari.view.hive20.internal.Connectable;
+import org.apache.ambari.view.hive20.internal.HiveConnectionWrapper;
+import org.apache.ambari.view.hive20.internal.dto.DatabaseInfo;
+import org.apache.ambari.view.hive20.internal.dto.TableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manages database related state, queries Hive to get the list of databases and then manages state for each database.
+ * Also, periodically updates the list of databases by calling hive.
+ */
+public class DatabaseManager extends HiveActor {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private final Connectable connectable;
+
+ private final ActorRef metaDataRetriever;
+ private final String username;
+
+ private boolean refreshInProgress = false;
+ private boolean selfRefreshQueued = false;
+
+ private Map<String, DatabaseWrapper> databases = new HashMap<>();
+ private Set<String> databasesToUpdate;
+
+
+ public DatabaseManager(String username, Connectable connectable) {
+ this.username = username;
+ this.connectable = connectable;
+ metaDataRetriever = getContext().actorOf(MetaDataRetriever.props(connectable));
+ }
+
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+
+ Object message = hiveMessage.getMessage();
+ if (message instanceof Refresh) {
+ handleRefresh();
+ } else if (message instanceof SelfRefresh) {
+ handleSelfRefresh();
+ } else if (message instanceof MetaDataRetriever.DBRefreshed) {
+ handleDBRefreshed((MetaDataRetriever.DBRefreshed) message);
+ } else if (message instanceof MetaDataRetriever.TableRefreshed) {
+ handleTableRefreshed((MetaDataRetriever.TableRefreshed) message);
+ } else if (message instanceof MetaDataRetriever.AllTableRefreshed) {
+ handleAllTableRefeshed((MetaDataRetriever.AllTableRefreshed) message);
+ } else if (message instanceof GetDatabases) {
+ handleGetDatabases((GetDatabases) message);
+ }
+
+ }
+
+ private void handleSelfRefresh() {
+ if (refreshInProgress) {
+ getContext().system().scheduler().scheduleOnce(Duration.create(500, TimeUnit.MILLISECONDS),
+ getSelf(), new SelfRefresh(), getContext().dispatcher(), getSelf());
+ } else {
+ selfRefreshQueued = false;
+ refresh();
+ }
+ }
+
+ private void handleRefresh() {
+ if (refreshInProgress && selfRefreshQueued) {
+ return; // We will not honor refresh message when a refresh is going on and another self refresh is queued in mailbox
+ } else if (refreshInProgress) {
+ selfRefreshQueued = true; // If refresh is in progress, we will queue up only one refresh message.
+ getContext().system().scheduler().scheduleOnce(Duration.create(500, TimeUnit.MILLISECONDS),
+ getSelf(), new SelfRefresh(), getContext().dispatcher(), getSelf());
+ } else {
+ refresh();
+ }
+ }
+
+ private void handleDBRefreshed(MetaDataRetriever.DBRefreshed message) {
+ Set<DatabaseInfo> databasesInfos = message.getDatabases();
+ Set<String> currentDatabases = new HashSet<>(databases.keySet());
+ Set<String> newDatabases = FluentIterable.from(databasesInfos).transform(new Function<DatabaseInfo, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable DatabaseInfo databaseInfo) {
+ return databaseInfo.getName();
+ }
+ }).toSet();
+
+ databasesToUpdate = new HashSet<>(newDatabases);
+
+ Set<String> databasesAdded = Sets.difference(newDatabases, currentDatabases);
+ Set<String> databasesRemoved = Sets.difference(currentDatabases, newDatabases);
+
+ updateDatabasesAdded(databasesAdded, databasesInfos);
+ updateDatabasesRemoved(databasesRemoved);
+ }
+
+ private void updateDatabasesAdded(Set<String> databasesAdded, Set<DatabaseInfo> databasesInfos) {
+ for (DatabaseInfo info : databasesInfos) {
+ if (databasesAdded.contains(info.getName())) {
+ DatabaseWrapper wrapper = new DatabaseWrapper(info);
+ databases.put(info.getName(), wrapper);
+ wrapper.getDatabaseNotifier().tell(new DatabaseChangeNotifier.DatabaseAdded(info.getName()), getSelf());
+ }
+ }
+ }
+
+ private void updateDatabasesRemoved(Set<String> databasesRemoved) {
+ for (String database : databasesRemoved) {
+ DatabaseWrapper wrapper = databases.remove(database);
+ ActorRef notifier = wrapper.getDatabaseNotifier();
+ notifier.tell(new DatabaseChangeNotifier.DatabaseRemoved(database), getSelf());
+ notifier.tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+
+ private void handleTableRefreshed(MetaDataRetriever.TableRefreshed message) {
+ ActorRef databaseChangeNotifier = getDatabaseChangeNotifier(message.getDatabase());
+ updateTable(message.getDatabase(), message.getTable());
+ databaseChangeNotifier.tell(new DatabaseChangeNotifier.TableUpdated(message.getTable()), getSelf());
+ }
+
+ private void handleAllTableRefeshed(MetaDataRetriever.AllTableRefreshed message) {
+ ActorRef databaseChangeNotifier = getDatabaseChangeNotifier(message.getDatabase());
+ databaseChangeNotifier.tell(new DatabaseChangeNotifier.AllTablesUpdated(message.getDatabase()), getSelf());
+ if (checkIfAllTablesOfAllDatabaseRefeshed(message)) {
+ refreshInProgress = false;
+ }
+ }
+
+ private void handleGetDatabases(GetDatabases message) {
+ if (refreshInProgress) {
+ // If currently refreshing, then schedule the same message after 500 milliseconds
+ getContext().system().scheduler().scheduleOnce(Duration.create(500, TimeUnit.MILLISECONDS),
+ getSelf(), message, getContext().dispatcher(), getSender());
+ return;
+ }
+ Set<DatabaseInfo> infos = new HashSet<>();
+ for (DatabaseWrapper wrapper : databases.values()) {
+ infos.add(wrapper.getDatabase());
+ }
+ getSender().tell(new DatabasesResult(infos), getSelf());
+ }
+
+ private boolean checkIfAllTablesOfAllDatabaseRefeshed(MetaDataRetriever.AllTableRefreshed message) {
+ databasesToUpdate.remove(message.getDatabase());
+ return databasesToUpdate.isEmpty();
+ }
+
+ private ActorRef getDatabaseChangeNotifier(String databaseName) {
+ DatabaseWrapper wrapper = databases.get(databaseName);
+ ActorRef databaseChangeNotifier = null;
+ if (wrapper != null) {
+ databaseChangeNotifier = wrapper.getDatabaseNotifier();
+ }
+ return databaseChangeNotifier;
+ }
+
+ private void refresh() {
+ LOG.info("Received refresh for user");
+ refreshInProgress = true;
+ metaDataRetriever.tell(new MetaDataRetriever.RefreshDB(), getSelf());
+
+ scheduleRefreshAfter(1, TimeUnit.MINUTES);
+ }
+
+ private void scheduleRefreshAfter(long time, TimeUnit timeUnit) {
+ getContext().system().scheduler().scheduleOnce(Duration.create(time, timeUnit),
+ getSelf(), new Refresh(username), getContext().dispatcher(), getSelf());
+ }
+
+ @Override
+ public void postStop() throws Exception {
+ LOG.info("Database Manager stopped!!!");
+ connectable.disconnect();
+ }
+
+ private void updateTable(String databaseName, TableInfo table) {
+ DatabaseWrapper wrapper = databases.get(databaseName);
+ if (wrapper != null) {
+ DatabaseInfo info = wrapper.getDatabase();
+ info.getTables().add(table);
+ }
+ }
+
+ public static Props props(ViewContext context) {
+ ConnectionConfig config = ConnectionFactory.create(context);
+ Connectable connectable = new HiveConnectionWrapper(config.getJdbcUrl(), config.getUsername(), config.getPassword(), new AuthParams(context));
+ return Props.create(DatabaseManager.class, config.getUsername(), connectable);
+ }
+
+ public static class Refresh {
+ private final String username;
+
+ public Refresh(String username) {
+ this.username = username;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+ }
+
+ private static class SelfRefresh {
+ }
+
+ private class DatabaseWrapper {
+ private final DatabaseInfo database;
+ private final ActorRef databaseNotifier;
+
+ private DatabaseWrapper(DatabaseInfo database) {
+ this.database = database;
+ databaseNotifier = getContext().actorOf(DatabaseChangeNotifier.props());
+ }
+
+ public DatabaseInfo getDatabase() {
+ return database;
+ }
+
+ public ActorRef getDatabaseNotifier() {
+ return databaseNotifier;
+ }
+ }
+
+ public static class GetDatabases {
+ private final String username;
+
+ public GetDatabases(String username) {
+ this.username = username;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+ }
+
+ public static class DatabasesResult {
+ private final Set<DatabaseInfo> databases;
+
+ public DatabasesResult(Set<DatabaseInfo> databases) {
+ this.databases = databases;
+ }
+
+ public Set<DatabaseInfo> getDatabases() {
+ return databases;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DeathWatch.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DeathWatch.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DeathWatch.java
new file mode 100644
index 0000000..58cefcd
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DeathWatch.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.Terminated;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.actor.message.RegisterActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+
+public class DeathWatch extends HiveActor {
+
+ private final static Logger LOG =
+ LoggerFactory.getLogger(DeathWatch.class);
+
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+ Object message = hiveMessage.getMessage();
+ if(message instanceof RegisterActor){
+ RegisterActor registerActor = (RegisterActor) message;
+ ActorRef actorRef = registerActor.getActorRef();
+ this.getContext().watch(actorRef);
+ LOG.info("Registered new actor "+ actorRef);
+ LOG.info("Registration for {} at {}", actorRef,new Date());
+ }
+ if(message instanceof Terminated){
+ Terminated terminated = (Terminated) message;
+ ActorRef actor = terminated.actor();
+ LOG.info("Received terminate for actor "+ actor);
+ LOG.info("Termination for {} at {}", actor,new Date());
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/HiveActor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/HiveActor.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/HiveActor.java
new file mode 100644
index 0000000..384b798
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/HiveActor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.UntypedActor;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class HiveActor extends UntypedActor {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ @Override
+ final public void onReceive(Object message) throws Exception {
+ HiveMessage hiveMessage = new HiveMessage(message);
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Received message: " + message.getClass().getName() + ", generated id: " + hiveMessage.getId() +
+ " sent by: " + sender() + ", recieved by" + self());
+ }
+
+ handleMessage(hiveMessage);
+
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Message submitted: " + hiveMessage.getId());
+
+ }
+ }
+
+ public abstract void handleMessage(HiveMessage hiveMessage);
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
new file mode 100644
index 0000000..ce58c8c
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive20.AuthParams;
+import org.apache.ambari.view.hive20.ConnectionDelegate;
+import org.apache.ambari.view.hive20.actor.message.Connect;
+import org.apache.ambari.view.hive20.actor.message.FetchError;
+import org.apache.ambari.view.hive20.actor.message.FetchResult;
+import org.apache.ambari.view.hive20.actor.message.GetColumnMetadataJob;
+import org.apache.ambari.view.hive20.actor.message.HiveJob;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.actor.message.ResultInformation;
+import org.apache.ambari.view.hive20.actor.message.ResultNotReady;
+import org.apache.ambari.view.hive20.actor.message.RunStatement;
+import org.apache.ambari.view.hive20.actor.message.SQLStatementJob;
+import org.apache.ambari.view.hive20.actor.message.job.CancelJob;
+import org.apache.ambari.view.hive20.actor.message.job.ExecuteNextStatement;
+import org.apache.ambari.view.hive20.actor.message.job.ExecutionFailed;
+import org.apache.ambari.view.hive20.actor.message.job.Failure;
+import org.apache.ambari.view.hive20.actor.message.job.NoResult;
+import org.apache.ambari.view.hive20.actor.message.job.ResultSetHolder;
+import org.apache.ambari.view.hive20.actor.message.job.SaveDagInformation;
+import org.apache.ambari.view.hive20.actor.message.job.SaveGuidToDB;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.CleanUp;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.DestroyConnector;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.FreeConnector;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.InactivityCheck;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.KeepAlive;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.TerminateInactivityCheck;
+import org.apache.ambari.view.hive20.internal.Connectable;
+import org.apache.ambari.view.hive20.internal.ConnectionException;
+import org.apache.ambari.view.hive20.persistence.Storage;
+import org.apache.ambari.view.hive20.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobImpl;
+import org.apache.ambari.view.hive20.utils.HiveActorConfiguration;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.hive.jdbc.HiveConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Wraps one Jdbc connection per user, per instance. This is used to delegate execute the statements and
+ * creates child actors to delegate the ResultSet extraction, YARN/ATS querying for ExecuteJob info and Log Aggregation
+ */
+public class JdbcConnector extends HiveActor {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ public static final String SUFFIX = "validating the login";
+
+ /**
+ * Interval for maximum inactivity allowed
+ */
+ private final static long MAX_INACTIVITY_INTERVAL = 5 * 60 * 1000;
+
+ /**
+ * Interval for maximum inactivity allowed before termination
+ */
+ private static final long MAX_TERMINATION_INACTIVITY_INTERVAL = 10 * 60 * 1000;
+
+ private static final long MILLIS_IN_SECOND = 1000L;
+
+ private final Storage storage;
+
+ /**
+ * Keeps track of the timestamp when the last activity has happened. This is
+ * used to calculate the inactivity period and take lifecycle decisions based
+ * on it.
+ */
+ private long lastActivityTimestamp;
+
+ /**
+ * Akka scheduler to tick at an interval to deal with inactivity of this actor
+ */
+ private Cancellable inactivityScheduler;
+
+ /**
+ * Akka scheduler to tick at an interval to deal with the inactivity after which
+ * the actor should be killed and connection should be released
+ */
+ private Cancellable terminateActorScheduler;
+
+ private Connectable connectable = null;
+ private final ActorRef deathWatch;
+ private final ConnectionDelegate connectionDelegate;
+ private final ActorRef parent;
+ private ActorRef statementExecutor = null;
+ private final HdfsApi hdfsApi;
+ private final AuthParams authParams;
+
+ /**
+ * true if the actor is currently executing any job.
+ */
+ private boolean executing = false;
+ private HiveJob.Type executionType = HiveJob.Type.SYNC;
+
+ /**
+ * Returns the timeout configurations.
+ */
+ private final HiveActorConfiguration actorConfiguration;
+ private String username;
+ private Optional<String> jobId = Optional.absent();
+ private Optional<String> logFile = Optional.absent();
+ private int statementsCount = 0;
+
+ private ActorRef commandSender = null;
+
+ private ActorRef resultSetIterator = null;
+ private boolean isFailure = false;
+ private Failure failure = null;
+ private boolean isCancelCalled = false;
+
+ /**
+ * For every execution, this will hold the statements that are left to execute
+ */
+ private Queue<String> statementQueue = new ArrayDeque<>();
+
+ public JdbcConnector(ViewContext viewContext, ActorRef parent, ActorRef deathWatch, HdfsApi hdfsApi,
+ ConnectionDelegate connectionDelegate, Storage storage) {
+ this.hdfsApi = hdfsApi;
+ this.parent = parent;
+ this.deathWatch = deathWatch;
+ this.connectionDelegate = connectionDelegate;
+ this.storage = storage;
+ this.lastActivityTimestamp = System.currentTimeMillis();
+ resultSetIterator = null;
+
+ authParams = new AuthParams(viewContext);
+ actorConfiguration = new HiveActorConfiguration(viewContext);
+ }
+
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+ Object message = hiveMessage.getMessage();
+ if (message instanceof InactivityCheck) {
+ checkInactivity();
+ } else if (message instanceof TerminateInactivityCheck) {
+ checkTerminationInactivity();
+ } else if (message instanceof KeepAlive) {
+ keepAlive();
+ } else if (message instanceof CleanUp) {
+ cleanUp();
+ } else {
+ handleNonLifecycleMessage(hiveMessage);
+ }
+ }
+
+ private void handleNonLifecycleMessage(HiveMessage hiveMessage) {
+ Object message = hiveMessage.getMessage();
+ keepAlive();
+ if (message instanceof Connect) {
+ connect((Connect) message);
+ } else if (message instanceof SQLStatementJob) {
+ runStatementJob((SQLStatementJob) message);
+ } else if (message instanceof GetColumnMetadataJob) {
+ runGetMetaData((GetColumnMetadataJob) message);
+ } else if (message instanceof ExecuteNextStatement) {
+ executeNextStatement();
+ } else if (message instanceof ResultInformation) {
+ gotResultBack((ResultInformation) message);
+ } else if (message instanceof CancelJob) {
+ cancelJob((CancelJob) message);
+ } else if (message instanceof FetchResult) {
+ fetchResult((FetchResult) message);
+ } else if (message instanceof FetchError) {
+ fetchError((FetchError) message);
+ } else if (message instanceof SaveGuidToDB) {
+ saveGuid((SaveGuidToDB) message);
+ } else if (message instanceof SaveDagInformation) {
+ saveDagInformation((SaveDagInformation) message);
+ } else {
+ unhandled(message);
+ }
+ }
+
+ private void fetchError(FetchError message) {
+ if (isFailure) {
+ sender().tell(Optional.of(failure), self());
+ return;
+ }
+ sender().tell(Optional.absent(), self());
+ }
+
+ private void fetchResult(FetchResult message) {
+ if (isFailure) {
+ sender().tell(failure, self());
+ return;
+ }
+
+ if (executing) {
+ sender().tell(new ResultNotReady(jobId.get(), username), self());
+ return;
+ }
+ sender().tell(Optional.fromNullable(resultSetIterator), self());
+ }
+
+ private void cancelJob(CancelJob message) {
+ if (!executing || connectionDelegate == null) {
+ LOG.error("Cannot cancel job for user as currently the job is not running or started. JobId: {}", message.getJobId());
+ return;
+ }
+ LOG.info("Cancelling job for user. JobId: {}, user: {}", message.getJobId(), username);
+ try {
+ isCancelCalled = true;
+ connectionDelegate.cancel();
+ } catch (SQLException e) {
+ LOG.error("Failed to cancel job. JobId: {}. {}", message.getJobId(), e);
+ }
+ }
+
+ private void gotResultBack(ResultInformation message) {
+ Optional<Failure> failureOptional = message.getFailure();
+ if (failureOptional.isPresent()) {
+ Failure failure = failureOptional.get();
+ processFailure(failure);
+ return;
+ }
+ if (statementQueue.size() == 0) {
+ // This is the last resultSet
+ processResult(message.getResultSet());
+ } else {
+ self().tell(new ExecuteNextStatement(), self());
+ }
+ }
+
+ private void processCancel() {
+ executing = false;
+ if (isAsync() && jobId.isPresent()) {
+ LOG.error("Job canceled by user for JobId: {}", jobId.get());
+ updateJobStatus(jobId.get(), Job.JOB_STATE_CANCELED);
+ }
+ }
+
+ private void processFailure(Failure failure) {
+ executing = false;
+ isFailure = true;
+ this.failure = failure;
+ if (isAsync() && jobId.isPresent()) {
+ if(isCancelCalled) {
+ processCancel();
+ return;
+ }
+ updateJobStatus(jobId.get(), Job.JOB_STATE_ERROR);
+ } else {
+ // Send for sync execution
+ commandSender.tell(new ExecutionFailed(failure.getMessage(), failure.getError()), self());
+ cleanUpWithTermination();
+ }
+ }
+
+ private void processResult(Optional<ResultSet> resultSetOptional) {
+ executing = false;
+
+ LOG.info("Finished processing SQL statements for Job id : {}", jobId.or("SYNC JOB"));
+ if (isAsync() && jobId.isPresent()) {
+ updateJobStatus(jobId.get(), Job.JOB_STATE_FINISHED);
+ }
+
+ if (resultSetOptional.isPresent()) {
+ ActorRef resultSetActor = getContext().actorOf(Props.create(ResultSetIterator.class, self(),
+ resultSetOptional.get(), isAsync()).withDispatcher("akka.actor.result-dispatcher"),
+ "ResultSetIterator:" + UUID.randomUUID().toString());
+ resultSetIterator = resultSetActor;
+ if (!isAsync()) {
+ commandSender.tell(new ResultSetHolder(resultSetActor), self());
+ }
+ } else {
+ resultSetIterator = null;
+ if (!isAsync()) {
+ commandSender.tell(new NoResult(), self());
+ }
+ }
+ }
+
+ private void executeNextStatement() {
+ if (statementQueue.isEmpty()) {
+ jobExecutionCompleted();
+ return;
+ }
+
+ int index = statementsCount - statementQueue.size();
+ String statement = statementQueue.poll();
+ if (statementExecutor == null) {
+ statementExecutor = getStatementExecutor();
+ }
+
+ if (isAsync()) {
+ statementExecutor.tell(new RunStatement(index, statement, jobId.get(), true, logFile.get(), true), self());
+ } else {
+ statementExecutor.tell(new RunStatement(index, statement), self());
+ }
+ }
+
+ private void runStatementJob(SQLStatementJob message) {
+ executing = true;
+ jobId = message.getJobId();
+ logFile = message.getLogFile();
+ executionType = message.getType();
+ commandSender = getSender();
+
+ resetToInitialState();
+
+ if (!checkConnection()) return;
+
+ for (String statement : message.getStatements()) {
+ statementQueue.add(statement);
+ }
+ statementsCount = statementQueue.size();
+
+ if (isAsync() && jobId.isPresent()) {
+ updateJobStatus(jobId.get(), Job.JOB_STATE_RUNNING);
+ startInactivityScheduler();
+ }
+ self().tell(new ExecuteNextStatement(), self());
+ }
+
+ public boolean checkConnection() {
+ if (connectable == null) {
+ notifyConnectFailure(new SQLException("Hive connection is not created"));
+ return false;
+ }
+
+ Optional<HiveConnection> connectionOptional = connectable.getConnection();
+ if (!connectionOptional.isPresent()) {
+ SQLException sqlException = connectable.isUnauthorized() ? new SQLException("Hive Connection not Authorized", "AUTHFAIL")
+ : new SQLException("Hive connection is not created");
+ notifyConnectFailure(sqlException);
+ return false;
+ }
+ return true;
+ }
+
+ private void runGetMetaData(GetColumnMetadataJob message) {
+ if (!checkConnection()) return;
+ resetToInitialState();
+ executing = true;
+ executionType = message.getType();
+ commandSender = getSender();
+ statementExecutor = getStatementExecutor();
+ statementExecutor.tell(message, self());
+ }
+
+ private ActorRef getStatementExecutor() {
+ return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage, connectable.getConnection().get(), connectionDelegate)
+ .withDispatcher("akka.actor.result-dispatcher"),
+ "StatementExecutor:" + UUID.randomUUID().toString());
+ }
+
+ private boolean isAsync() {
+ return executionType == HiveJob.Type.ASYNC;
+ }
+
+ private void notifyConnectFailure(Exception ex) {
+ executing = false;
+ isFailure = true;
+ this.failure = new Failure("Cannot connect to hive", ex);
+ if (isAsync()) {
+ updateJobStatus(jobId.get(), Job.JOB_STATE_ERROR);
+
+ if(ex instanceof ConnectionException){
+ ConnectionException connectionException = (ConnectionException) ex;
+ Throwable cause = connectionException.getCause();
+ if(cause instanceof SQLException){
+ SQLException sqlException = (SQLException) cause;
+ if(isLoginError(sqlException))
+ return;
+ }
+ }
+
+ } else {
+ sender().tell(new ExecutionFailed("Cannot connect to hive"), ActorRef.noSender());
+ }
+ // Do not clean up in case of failed authorizations
+ // The failure is bubbled to the user for requesting credentials
+
+ if (!(ex instanceof SQLException) || !((SQLException) ex).getSQLState().equals("AUTHFAIL")) {
+ cleanUpWithTermination();
+ }
+ }
+
+ private boolean isLoginError(SQLException ce) {
+ return ce.getCause().getMessage().toLowerCase().endsWith(SUFFIX);
+ }
+
+ private void keepAlive() {
+ lastActivityTimestamp = System.currentTimeMillis();
+ }
+
+ private void jobExecutionCompleted() {
+ // Set is executing as false so that the inactivity checks can finish cleanup
+ // after timeout
+ LOG.info("Job execution completed for user: {}. Results are ready to be fetched", username);
+ this.executing = false;
+ }
+
+ protected Optional<String> getUsername() {
+ return Optional.fromNullable(username);
+ }
+
+ private void connect(Connect message) {
+ username = message.getUsername();
+ jobId = message.getJobId();
+ executionType = message.getType();
+ // check the connectable
+ if (connectable == null) {
+ connectable = message.getConnectable(authParams);
+ }
+ // make the connectable to Hive
+ try {
+ if (!connectable.isOpen()) {
+ connectable.connect();
+ }
+ } catch (ConnectionException e) {
+ LOG.error("Failed to create a hive connection. {}", e);
+ // set up job failure
+ // notify parent about job failure
+ notifyConnectFailure(e);
+ return;
+ }
+ startTerminateInactivityScheduler();
+ }
+
+ private void updateJobStatus(String jobid, final String status) {
+ new JobSaver(jobid) {
+ @Override
+ protected void update(JobImpl job) {
+ job.setStatus(status);
+ job.setDuration(getUpdatedDuration(job.getDateSubmitted()));
+ }
+ }.save();
+ LOG.info("Stored job status for Job id: {} as '{}'", jobid, status);
+ }
+
+ private void saveGuid(final SaveGuidToDB message) {
+ new JobSaver(message.getJobId()) {
+ @Override
+ protected void update(JobImpl job) {
+ job.setGuid(message.getGuid());
+ }
+ }.save();
+ LOG.info("Stored GUID for Job id: {} as '{}'", message.getJobId(), message.getGuid());
+ }
+
+ private void saveDagInformation(final SaveDagInformation message) {
+ if(message.getDagId() == null &&
+ message.getDagName() == null &&
+ message.getApplicationId() == null) {
+ LOG.error("Cannot save Dag Information for job Id: {} as all the properties are null.", message.getJobId());
+ return;
+ }
+ new JobSaver(message.getJobId()) {
+
+ @Override
+ protected void update(JobImpl job) {
+ if (message.getApplicationId() != null) {
+ job.setApplicationId(message.getApplicationId());
+ }
+ if (message.getDagId() != null) {
+ job.setDagId(message.getDagId());
+ }
+ if(message.getDagName() != null) {
+ job.setDagName(message.getDagName());
+ }
+ }
+ }.save();
+ LOG.info("Store Dag Information for job. Job id: {}, dagName: {}, dagId: {}, applicationId: {}", message.getJobId(), message.getDagName(), message.getDagId(), message.getApplicationId());
+ }
+
+ private Long getUpdatedDuration(Long dateSubmitted) {
+ return (System.currentTimeMillis() / MILLIS_IN_SECOND) - (dateSubmitted / MILLIS_IN_SECOND);
+ }
+
+
+ private void checkInactivity() {
+ LOG.debug("Inactivity check, executing status: {}", executing);
+ if (executing) {
+ keepAlive();
+ return;
+ }
+ long current = System.currentTimeMillis();
+ if ((current - lastActivityTimestamp) > actorConfiguration.getInactivityTimeout(MAX_INACTIVITY_INTERVAL)) {
+ // Stop all the sub-actors created
+ cleanUp();
+ }
+ }
+
+ private void checkTerminationInactivity() {
+ if (!isAsync()) {
+ // Should not terminate if job is sync. Will terminate after the job is finished.
+ stopTerminateInactivityScheduler();
+ return;
+ }
+
+ LOG.debug("Termination check, executing status: {}", executing);
+ if (executing) {
+ keepAlive();
+ return;
+ }
+
+ long current = System.currentTimeMillis();
+ if ((current - lastActivityTimestamp) > actorConfiguration.getTerminationTimeout(MAX_TERMINATION_INACTIVITY_INTERVAL)) {
+ cleanUpWithTermination();
+ }
+ }
+
+ private void cleanUp() {
+ if (jobId.isPresent()) {
+ LOG.debug("{} :: Cleaning up resources for inactivity for jobId: {}", self().path().name(), jobId.get());
+ } else {
+ LOG.debug("{} ::Cleaning up resources with inactivity for Sync execution.", self().path().name());
+ }
+ this.executing = false;
+ cleanUpStatementAndResultSet();
+ stopInactivityScheduler();
+ parent.tell(new FreeConnector(username, jobId.orNull(), isAsync()), self());
+ }
+
+ private void cleanUpWithTermination() {
+ this.executing = false;
+ LOG.debug("{} :: Cleaning up resources with inactivity for execution.", self().path().name());
+ cleanUpStatementAndResultSet();
+
+ stopInactivityScheduler();
+ stopTerminateInactivityScheduler();
+ parent.tell(new DestroyConnector(username, jobId.orNull(), isAsync()), this.self());
+ self().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+
+ private void cleanUpStatementAndResultSet() {
+ connectionDelegate.closeStatement();
+ connectionDelegate.closeResultSet();
+ }
+
+ private void startTerminateInactivityScheduler() {
+ this.terminateActorScheduler = getContext().system().scheduler().schedule(
+ Duration.Zero(), Duration.create(60 * 1000, TimeUnit.MILLISECONDS),
+ this.getSelf(), new TerminateInactivityCheck(), getContext().dispatcher(), null);
+ }
+
+ private void stopTerminateInactivityScheduler() {
+ if (!(terminateActorScheduler == null || terminateActorScheduler.isCancelled())) {
+ terminateActorScheduler.cancel();
+ }
+ }
+
+ private void startInactivityScheduler() {
+ if (inactivityScheduler != null) {
+ inactivityScheduler.cancel();
+ }
+ inactivityScheduler = getContext().system().scheduler().schedule(
+ Duration.Zero(), Duration.create(15 * 1000, TimeUnit.MILLISECONDS),
+ this.self(), new InactivityCheck(), getContext().dispatcher(), null);
+ }
+
+ private void stopInactivityScheduler() {
+ if (!(inactivityScheduler == null || inactivityScheduler.isCancelled())) {
+ inactivityScheduler.cancel();
+ }
+ }
+
+ private void resetToInitialState() {
+ isFailure = false;
+ failure = null;
+ resultSetIterator = null;
+ isCancelCalled = false;
+ statementQueue = new ArrayDeque<>();
+ }
+
+ @Override
+ public void postStop() throws Exception {
+ stopInactivityScheduler();
+ stopTerminateInactivityScheduler();
+
+ if (connectable.isOpen()) {
+ connectable.disconnect();
+ }
+ }
+
+ /**
+ * Saves the job to database.
+ */
+ private abstract class JobSaver {
+ private final String jobId;
+
+ JobSaver(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public void save() {
+ try {
+ JobImpl job = storage.load(JobImpl.class, jobId);
+ update(job);
+ storage.store(JobImpl.class, job);
+ } catch (ItemNotFound itemNotFound) {
+ itemNotFound(jobId);
+ }
+ }
+
+ /**
+ * Override to handle Not found exception
+ */
+ private void itemNotFound(String jobId) {
+ // Nothing to do
+ }
+
+ protected abstract void update(JobImpl job);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
new file mode 100644
index 0000000..f9c21b4
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import com.google.common.base.Joiner;
+import org.apache.ambari.view.hive20.actor.message.GetMoreLogs;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.actor.message.LogAggregationFinished;
+import org.apache.ambari.view.hive20.actor.message.StartLogAggregation;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.ambari.view.utils.hdfs.HdfsApiException;
+import org.apache.ambari.view.utils.hdfs.HdfsUtil;
+import org.apache.hive.jdbc.HiveStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Reads the logs for a ExecuteJob from the Statement and writes them into hdfs.
+ */
+public class LogAggregator extends HiveActor {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ public static final int AGGREGATION_INTERVAL = 5 * 1000;
+ private final HdfsApi hdfsApi;
+ private final HiveStatement statement;
+ private final String logFile;
+
+ private Cancellable moreLogsScheduler;
+ private ActorRef parent;
+ private boolean hasStartedFetching = false;
+ private boolean shouldFetchMore = true;
+
+ public LogAggregator(HdfsApi hdfsApi, HiveStatement statement, String logFile) {
+ this.hdfsApi = hdfsApi;
+ this.statement = statement;
+ this.logFile = logFile;
+ }
+
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+ Object message = hiveMessage.getMessage();
+ if (message instanceof StartLogAggregation) {
+ start();
+ }
+
+ if (message instanceof GetMoreLogs) {
+ try {
+ getMoreLogs();
+ } catch (SQLException e) {
+ LOG.error("SQL Error while getting logs. Tried writing to: {}", logFile);
+ } catch (HdfsApiException e) {
+ LOG.warn("HDFS Error while getting writing logs to {}", logFile);
+
+ }
+ }
+ }
+
+ private void start() {
+ parent = this.getSender();
+ hasStartedFetching = false;
+ shouldFetchMore = true;
+ if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) {
+ moreLogsScheduler.cancel();
+ }
+ this.moreLogsScheduler = getContext().system().scheduler().schedule(
+ Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, TimeUnit.MILLISECONDS),
+ this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null);
+ }
+
+ private void getMoreLogs() throws SQLException, HdfsApiException {
+ List<String> logs = statement.getQueryLog();
+ if (logs.size() > 0 && shouldFetchMore) {
+ String allLogs = Joiner.on("\n").skipNulls().join(logs);
+ HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
+ if(!statement.hasMoreLogs()) {
+ shouldFetchMore = false;
+ }
+ } else {
+ // Cancel the timer only when log fetching has been started
+ if(!shouldFetchMore) {
+ moreLogsScheduler.cancel();
+ parent.tell(new LogAggregationFinished(), ActorRef.noSender());
+ }
+ }
+ }
+
+ @Override
+ public void postStop() throws Exception {
+ if (moreLogsScheduler != null && !moreLogsScheduler.isCancelled()) {
+ moreLogsScheduler.cancel();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataManager.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataManager.java
new file mode 100644
index 0000000..d63b3a0
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataManager.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Props;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.actor.message.Ping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manages the Meta Information for Hive Server. Singleton actor which stores several DatabaseManagerActor in memory for
+ * each user and instance name combination.
+ */
+public class MetaDataManager extends HiveActor {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ /**
+ * Stores the sub database manager actors per user combination
+ */
+ private final Map<String, ActorRef> databaseManagers = new HashMap<>();
+ private final Map<String, Cancellable> terminationSchedulers = new HashMap<>();
+ private final ViewContext context;
+
+ public MetaDataManager(ViewContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+
+ Object message = hiveMessage.getMessage();
+ if (message instanceof Ping) {
+ handlePing((Ping) message);
+ } else if (message instanceof Terminate) {
+ handleTerminate((Terminate) message);
+ } else if (message instanceof DatabaseManager.GetDatabases) {
+ handleGetDatabases((DatabaseManager.GetDatabases) message);
+ }
+ }
+
+ private void handlePing(Ping message) {
+ LOG.info("Ping message received for user: {}, instance: {}", message.getUsername(), message.getInstanceName());
+ ActorRef databaseManager = databaseManagers.get(message.getUsername());
+ if (databaseManager == null) {
+ databaseManager = createDatabaseManager(message.getUsername(), message.getInstanceName());
+ databaseManagers.put(context.getUsername(), databaseManager);
+ databaseManager.tell(new DatabaseManager.Refresh(context.getUsername()), getSelf());
+ } else {
+ cancelTerminationScheduler(message.getUsername());
+ }
+ scheduleTermination(context.getUsername());
+ }
+
+ private void handleTerminate(Terminate message) {
+ ActorRef databaseManager = databaseManagers.remove(message.username);
+ getContext().stop(databaseManager);
+ cancelTerminationScheduler(message.getUsername());
+ }
+
+ private void handleGetDatabases(DatabaseManager.GetDatabases message) {
+ String username = message.getUsername();
+ ActorRef databaseManager = databaseManagers.get(username);
+ if(databaseManager != null) {
+ databaseManager.tell(message, getSender());
+ } else {
+ // Not database Manager created. Start the database manager with a ping message
+ // and queue up the GetDatabases call to self
+ getSelf().tell(new Ping(username, context.getInstanceName()), getSender());
+ getSelf().tell(message, getSender());
+ }
+ }
+
+ private void cancelTerminationScheduler(String username) {
+ Cancellable cancellable = terminationSchedulers.remove(username);
+ if (!(cancellable == null || cancellable.isCancelled())) {
+ LOG.info("Cancelling termination scheduler");
+ cancellable.cancel();
+ }
+ }
+
+ private void scheduleTermination(String username) {
+ Cancellable cancellable = context().system().scheduler().scheduleOnce(Duration.create(2, TimeUnit.MINUTES),
+ getSelf(), new Terminate(username), getContext().dispatcher(), getSelf());
+ terminationSchedulers.put(username, cancellable);
+ }
+
+ private ActorRef createDatabaseManager(String username, String instanceName) {
+ LOG.info("Creating database manager for username: {}, instance: {}", username, instanceName);
+ return context().actorOf(DatabaseManager.props(context));
+ }
+
+ public static Props props(ViewContext viewContext) {
+ return Props.create(MetaDataManager.class, viewContext);
+ }
+
+ private class Terminate {
+ public final String username;
+
+ public Terminate(String username) {
+ this.username = username;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataRetriever.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataRetriever.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataRetriever.java
new file mode 100644
index 0000000..7323a0a
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataRetriever.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.internal.Connectable;
+import org.apache.ambari.view.hive20.internal.ConnectionException;
+import org.apache.ambari.view.hive20.internal.dto.DatabaseInfo;
+import org.apache.ambari.view.hive20.internal.dto.TableInfo;
+import org.apache.hive.jdbc.HiveConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *
+ */
+public class MetaDataRetriever extends HiveActor {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private final Connectable connectable;
+
+ public MetaDataRetriever(Connectable connectable) {
+ this.connectable = connectable;
+ }
+
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+ Object message = hiveMessage.getMessage();
+ if (message instanceof RefreshDB) {
+ handleRefreshDB();
+ }
+ }
+
+ private void handleRefreshDB() {
+ try {
+ refreshDatabaseInfos();
+ } catch (ConnectionException | SQLException e) {
+ LOG.error("Failed to update the complete database information. Exception: {}", e);
+ getSender().tell(new DBRefreshFailed(e), getSelf());
+ }
+ }
+
+ private HiveConnection getHiveConnection() throws ConnectionException {
+ if (!connectable.isOpen()) {
+ connectable.connect();
+ }
+ Optional<HiveConnection> connectionOptional = connectable.getConnection();
+ return connectionOptional.get();
+ }
+
+ private void refreshDatabaseInfos() throws ConnectionException, SQLException {
+ HiveConnection connection = getHiveConnection();
+ Set<DatabaseInfo> infos = new HashSet<>();
+ try (ResultSet schemas = connection.getMetaData().getSchemas()) {
+ while (schemas.next()) {
+ DatabaseInfo info = new DatabaseInfo(schemas.getString(1));
+ infos.add(info);
+ }
+ }
+
+ getSender().tell(new DBRefreshed(infos), getSelf());
+
+ for (DatabaseInfo info : infos) {
+ refreshTablesInfo(info.getName());
+ }
+ }
+
+ private void refreshTablesInfo(String database) throws ConnectionException, SQLException {
+ HiveConnection connection = getHiveConnection();
+ try (ResultSet tables = connection.getMetaData().getTables("", database, null, null)) {
+ while (tables.next()) {
+ TableInfo info = new TableInfo(tables.getString(3), tables.getString(4));
+ getSender().tell(new TableRefreshed(info, database), getSelf());
+ }
+ }
+ getSender().tell(new AllTableRefreshed(database), getSelf());
+ }
+
+ public static Props props(Connectable connectable) {
+ return Props.create(MetaDataRetriever.class, connectable);
+ }
+
+
+ public static class RefreshDB {
+
+ }
+
+ public static class DBRefreshed {
+ private final Set<DatabaseInfo> databases;
+
+ public DBRefreshed(Set<DatabaseInfo> databases) {
+ this.databases = databases;
+ }
+
+ public Set<DatabaseInfo> getDatabases() {
+ return databases;
+ }
+ }
+
+ public static class DBRefreshFailed {
+ private final Exception exception;
+
+ public DBRefreshFailed(Exception exception) {
+ this.exception = exception;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+ }
+
+ public static class TableRefreshed {
+ private final TableInfo table;
+ private final String database;
+
+ public TableRefreshed(TableInfo table, String database) {
+ this.table = table;
+ this.database = database;
+ }
+
+ public TableInfo getTable() {
+ return table;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+ }
+
+ public static class AllTableRefreshed {
+ private final String database;
+
+ public AllTableRefreshed(String database) {
+ this.database = database;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/OperationController.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/OperationController.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/OperationController.java
new file mode 100644
index 0000000..f751d8f
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/OperationController.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive20.ConnectionDelegate;
+import org.apache.ambari.view.hive20.actor.message.Connect;
+import org.apache.ambari.view.hive20.actor.message.ExecuteJob;
+import org.apache.ambari.view.hive20.actor.message.FetchError;
+import org.apache.ambari.view.hive20.actor.message.FetchResult;
+import org.apache.ambari.view.hive20.actor.message.HiveJob;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.actor.message.JobRejected;
+import org.apache.ambari.view.hive20.actor.message.RegisterActor;
+import org.apache.ambari.view.hive20.actor.message.SQLStatementJob;
+import org.apache.ambari.view.hive20.actor.message.job.CancelJob;
+import org.apache.ambari.view.hive20.actor.message.job.FetchFailed;
+import org.apache.ambari.view.hive20.actor.message.job.SaveDagInformation;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.DestroyConnector;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.FreeConnector;
+import org.apache.ambari.view.hive20.internal.ContextSupplier;
+import org.apache.ambari.view.hive20.persistence.Storage;
+import org.apache.ambari.view.hive20.utils.LoggingOutputStream;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.collections4.map.HashedMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Router actor to control the operations. This delegates the operations to underlying child actors and
+ * store the state for them.
+ */
+public class OperationController extends HiveActor {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private final ActorSystem system;
+ private final ActorRef deathWatch;
+ private final ContextSupplier<ConnectionDelegate> connectionSupplier;
+ private final ContextSupplier<Storage> storageSupplier;
+ private final ContextSupplier<Optional<HdfsApi>> hdfsApiSupplier;
+
+ /**
+ * Store the connection per user which are currently not working
+ */
+ private final Map<String, Queue<ActorRef>> asyncAvailableConnections;
+
+ /**
+ * Store the connection per user which are currently not working
+ */
+ private final Map<String, Queue<ActorRef>> syncAvailableConnections;
+
+
+ /**
+ * Store the connection per user/per job which are currently working.
+ */
+ private final Map<String, Map<String, ActorRef>> asyncBusyConnections;
+
+ /**
+ * Store the connection per user which will be used to execute sync jobs
+ * like fetching databases, tables etc.
+ */
+ private final Map<String, Set<ActorRef>> syncBusyConnections;
+
+
+ private final ViewContext context;
+
+ public OperationController(ActorSystem system,
+ ActorRef deathWatch,
+ ViewContext context,
+ ContextSupplier<ConnectionDelegate> connectionSupplier,
+ ContextSupplier<Storage> storageSupplier,
+ ContextSupplier<Optional<HdfsApi>> hdfsApiSupplier) {
+ this.system = system;
+ this.deathWatch = deathWatch;
+ this.context = context;
+ this.connectionSupplier = connectionSupplier;
+ this.storageSupplier = storageSupplier;
+ this.hdfsApiSupplier = hdfsApiSupplier;
+ this.asyncAvailableConnections = new HashMap<>();
+ this.syncAvailableConnections = new HashMap<>();
+ this.asyncBusyConnections = new HashedMap<>();
+ this.syncBusyConnections = new HashMap<>();
+ }
+
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+ Object message = hiveMessage.getMessage();
+
+ if (message instanceof ExecuteJob) {
+ ExecuteJob job = (ExecuteJob) message;
+ if (job.getJob().getType() == HiveJob.Type.ASYNC) {
+ sendJob(job.getConnect(), (SQLStatementJob) job.getJob());
+ } else if (job.getJob().getType() == HiveJob.Type.SYNC) {
+ sendSyncJob(job.getConnect(), job.getJob());
+ }
+ }
+
+ if (message instanceof CancelJob) {
+ cancelJob((CancelJob) message);
+ }
+
+ if (message instanceof FetchResult) {
+ fetchResultActorRef((FetchResult) message);
+ }
+
+ if (message instanceof FetchError) {
+ fetchError((FetchError) message);
+ }
+
+ if (message instanceof FreeConnector) {
+ freeConnector((FreeConnector) message);
+ }
+
+ if (message instanceof DestroyConnector) {
+ destroyConnector((DestroyConnector) message);
+ }
+
+ if (message instanceof SaveDagInformation) {
+ saveDagInformation((SaveDagInformation) message);
+ }
+ }
+
+ private void cancelJob(CancelJob message) {
+ String jobId = message.getJobId();
+ String username = message.getUsername();
+ ActorRef actorRef = asyncBusyConnections.get(username).get(jobId);
+ if (actorRef != null) {
+ actorRef.tell(message, sender());
+ } else {
+ String msg = String.format("Cannot cancel job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName());
+ LOG.error(msg);
+ sender().tell(new FetchFailed(msg), self());
+ }
+ }
+
+ private void saveDagInformation(SaveDagInformation message) {
+ ActorRef jdbcConnection = asyncBusyConnections.get(context.getUsername()).get(message.getJobId());
+ if(jdbcConnection != null) {
+ jdbcConnection.tell(message, sender());
+ } else {
+ String msg = String.format("Cannot update Dag Information for job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName());
+ LOG.error(msg);
+ }
+ }
+
+ private void fetchError(FetchError message) {
+ String jobId = message.getJobId();
+ String username = message.getUsername();
+ ActorRef actorRef = asyncBusyConnections.get(username).get(jobId);
+ if (actorRef != null) {
+ actorRef.tell(message, sender());
+ } else {
+ String msg = String.format("Cannot fetch error for job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName());
+ LOG.error(msg);
+ sender().tell(new FetchFailed(msg), self());
+ }
+ }
+
+ private void fetchResultActorRef(FetchResult message) {
+ String username = message.getUsername();
+ String jobId = message.getJobId();
+ ActorRef actorRef = asyncBusyConnections.get(username).get(jobId);
+ if (actorRef != null) {
+ actorRef.tell(message, sender());
+ } else {
+ String msg = String.format("Cannot fetch result for job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName());
+ LOG.error(msg);
+ sender().tell(new FetchFailed(msg), self());
+ }
+ }
+
+ private void sendJob(Connect connect, SQLStatementJob job) {
+ String username = job.getUsername();
+ String jobId = job.getJobId().get();
+ ActorRef subActor = null;
+ // Check if there is available actors to process this
+ subActor = getActorRefFromAsyncPool(username);
+ if (subActor == null) {
+ Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(context);
+ if (!hdfsApiOptional.isPresent()) {
+ sender().tell(new JobRejected(username, jobId, "Failed to connect to Hive."), self());
+ return;
+ }
+ HdfsApi hdfsApi = hdfsApiOptional.get();
+
+ subActor = system.actorOf(
+ Props.create(JdbcConnector.class, context, self(),
+ deathWatch, hdfsApi, connectionSupplier.get(context),
+ storageSupplier.get(context)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
+ UUID.randomUUID().toString() + ":asyncjdbcConnector");
+ deathWatch.tell(new RegisterActor(subActor), self());
+ }
+
+ if (asyncBusyConnections.containsKey(username)) {
+ Map<String, ActorRef> actors = asyncBusyConnections.get(username);
+ if (!actors.containsKey(jobId)) {
+ actors.put(jobId, subActor);
+ } else {
+ // Reject this as with the same jobId one connection is already in progress.
+ sender().tell(new JobRejected(username, jobId, "Existing job in progress with same jobId."), ActorRef.noSender());
+ }
+ } else {
+ Map<String, ActorRef> actors = new HashMap<>();
+ actors.put(jobId, subActor);
+ asyncBusyConnections.put(username, actors);
+ }
+
+ // set up the connect with ExecuteJob id for terminations
+ subActor.tell(connect, self());
+ subActor.tell(job, self());
+
+ }
+
+ private ActorRef getActorRefFromSyncPool(String username) {
+ return getActorRefFromPool(syncAvailableConnections, username);
+ }
+
+ private ActorRef getActorRefFromAsyncPool(String username) {
+ return getActorRefFromPool(asyncAvailableConnections, username);
+ }
+
+ private ActorRef getActorRefFromPool(Map<String, Queue<ActorRef>> pool, String username) {
+ ActorRef subActor = null;
+ if (pool.containsKey(username)) {
+ Queue<ActorRef> availableActors = pool.get(username);
+ if (availableActors.size() != 0) {
+ subActor = availableActors.poll();
+ }
+ } else {
+ pool.put(username, new LinkedList<ActorRef>());
+ }
+ return subActor;
+ }
+
+ private void sendSyncJob(Connect connect, HiveJob job) {
+ String username = job.getUsername();
+ ActorRef subActor = null;
+ // Check if there is available actors to process this
+ subActor = getActorRefFromSyncPool(username);
+
+ if (subActor == null) {
+ Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(context);
+ if (!hdfsApiOptional.isPresent()) {
+ sender().tell(new JobRejected(username, ExecuteJob.SYNC_JOB_MARKER, "Failed to connect to HDFS."), ActorRef.noSender());
+ return;
+ }
+ HdfsApi hdfsApi = hdfsApiOptional.get();
+
+ subActor = system.actorOf(
+ Props.create(JdbcConnector.class, context, self(),
+ deathWatch, hdfsApi, connectionSupplier.get(context),
+ storageSupplier.get(context)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
+ UUID.randomUUID().toString() + ":syncjdbcConnector");
+ deathWatch.tell(new RegisterActor(subActor), self());
+ }
+
+ if (syncBusyConnections.containsKey(username)) {
+ Set<ActorRef> actors = syncBusyConnections.get(username);
+ actors.add(subActor);
+ } else {
+ LinkedHashSet<ActorRef> actors = new LinkedHashSet<>();
+ actors.add(subActor);
+ syncBusyConnections.put(username, actors);
+ }
+
+ // Termination requires that the ref is known in case of sync jobs
+ subActor.tell(connect, sender());
+ subActor.tell(job, sender());
+ }
+
+
+ private void destroyConnector(DestroyConnector message) {
+ ActorRef sender = getSender();
+ if (message.isForAsync()) {
+ removeFromAsyncBusyPool(message.getUsername(), message.getJobId());
+ removeFromASyncAvailable(message.getUsername(), sender);
+ } else {
+ removeFromSyncBusyPool(message.getUsername(), sender);
+ removeFromSyncAvailable(message.getUsername(), sender);
+ }
+ logMaps();
+ }
+
+ private void freeConnector(FreeConnector message) {
+ ActorRef sender = getSender();
+ if (message.isForAsync()) {
+ LOG.info("About to free connector for job {} and user {}", message.getJobId(), message.getUsername());
+ Optional<ActorRef> refOptional = removeFromAsyncBusyPool(message.getUsername(), message.getJobId());
+ if (refOptional.isPresent()) {
+ addToAsyncAvailable(message.getUsername(), refOptional.get());
+ }
+ return;
+ }
+
+ // Was a sync job, remove from sync pool
+ LOG.info("About to free sync connector for user {}", message.getUsername());
+ Optional<ActorRef> refOptional = removeFromSyncBusyPool(message.getUsername(), sender);
+ if (refOptional.isPresent()) {
+ addToSyncAvailable(message.getUsername(), refOptional.get());
+ }
+
+
+ logMaps();
+
+ }
+
+ private void logMaps() {
+ LOG.debug("Pool status");
+ LoggingOutputStream out = new LoggingOutputStream(LOG, LoggingOutputStream.LogLevel.DEBUG);
+ MapUtils.debugPrint(new PrintStream(out), "Busy Async connections", asyncBusyConnections);
+ MapUtils.debugPrint(new PrintStream(out), "Available Async connections", asyncAvailableConnections);
+ MapUtils.debugPrint(new PrintStream(out), "Busy Sync connections", syncBusyConnections);
+ MapUtils.debugPrint(new PrintStream(out), "Available Sync connections", syncAvailableConnections);
+ try {
+ out.close();
+ } catch (IOException e) {
+ LOG.warn("Cannot close Logging output stream, this may lead to leaks");
+ }
+ }
+
+ private Optional<ActorRef> removeFromSyncBusyPool(String userName, ActorRef refToFree) {
+ if (syncBusyConnections.containsKey(userName)) {
+ Set<ActorRef> actorRefs = syncBusyConnections.get(userName);
+ actorRefs.remove(refToFree);
+ }
+ return Optional.of(refToFree);
+ }
+
+ private Optional<ActorRef> removeFromAsyncBusyPool(String username, String jobId) {
+ ActorRef ref = null;
+ if (asyncBusyConnections.containsKey(username)) {
+ Map<String, ActorRef> actors = asyncBusyConnections.get(username);
+ if (actors.containsKey(jobId)) {
+ ref = actors.get(jobId);
+ actors.remove(jobId);
+ }
+ }
+ return Optional.fromNullable(ref);
+ }
+
+ private void addToAsyncAvailable(String username, ActorRef actor) {
+ addToAvailable(asyncAvailableConnections, username, actor);
+ }
+
+ private void addToSyncAvailable(String username, ActorRef actor) {
+ addToAvailable(syncAvailableConnections, username, actor);
+ }
+
+ private void addToAvailable(Map<String, Queue<ActorRef>> pool, String username, ActorRef actor) {
+ if (!pool.containsKey(username)) {
+ pool.put(username, new LinkedList<ActorRef>());
+ }
+
+ Queue<ActorRef> availableActors = pool.get(username);
+ availableActors.add(actor);
+ }
+
+ private void removeFromASyncAvailable(String username, ActorRef sender) {
+ removeFromAvailable(asyncAvailableConnections, username, sender);
+ }
+
+ private void removeFromSyncAvailable(String username, ActorRef sender) {
+ removeFromAvailable(syncAvailableConnections, username, sender);
+ }
+
+ private void removeFromAvailable(Map<String, Queue<ActorRef>> pool, String username, ActorRef sender) {
+ if (!pool.containsKey(username)) {
+ return;
+ }
+ Queue<ActorRef> actors = pool.get(username);
+ actors.remove(sender);
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/ResultSetIterator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/ResultSetIterator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/ResultSetIterator.java
new file mode 100644
index 0000000..4b4a407
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/ResultSetIterator.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.actor;
+
+import akka.actor.ActorRef;
+import com.google.common.collect.Lists;
+import org.apache.ambari.view.hive20.actor.message.CursorReset;
+import org.apache.ambari.view.hive20.actor.message.HiveMessage;
+import org.apache.ambari.view.hive20.actor.message.ResetCursor;
+import org.apache.ambari.view.hive20.actor.message.job.FetchFailed;
+import org.apache.ambari.view.hive20.actor.message.job.Next;
+import org.apache.ambari.view.hive20.actor.message.job.NoMoreItems;
+import org.apache.ambari.view.hive20.actor.message.job.Result;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.CleanUp;
+import org.apache.ambari.view.hive20.actor.message.lifecycle.KeepAlive;
+import org.apache.ambari.view.hive20.client.ColumnDescription;
+import org.apache.ambari.view.hive20.client.ColumnDescriptionShort;
+import org.apache.ambari.view.hive20.client.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
+public class ResultSetIterator extends HiveActor {
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private static final int DEFAULT_BATCH_SIZE = 100;
+ public static final String NULL = "NULL";
+
+ private final ActorRef parent;
+ private final ResultSet resultSet;
+ private final int batchSize;
+
+ private List<ColumnDescription> columnDescriptions;
+ private int columnCount;
+ boolean async = false;
+ private boolean metaDataFetched = false;
+
+ public ResultSetIterator(ActorRef parent, ResultSet resultSet, int batchSize, boolean isAsync) {
+ this.parent = parent;
+ this.resultSet = resultSet;
+ this.batchSize = batchSize;
+ this.async = isAsync;
+ }
+
+ public ResultSetIterator(ActorRef parent, ResultSet resultSet) {
+ this(parent, resultSet, DEFAULT_BATCH_SIZE, true);
+ }
+
+ public ResultSetIterator(ActorRef parent, ResultSet resultSet, boolean isAsync) {
+ this(parent, resultSet, DEFAULT_BATCH_SIZE, isAsync);
+ }
+
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+ sendKeepAlive();
+ Object message = hiveMessage.getMessage();
+ if (message instanceof Next) {
+ getNext();
+ }
+ if (message instanceof ResetCursor) {
+ resetResultSet();
+ }
+
+ if (message instanceof KeepAlive) {
+ sendKeepAlive();
+ }
+ }
+
+ private void resetResultSet() {
+ try {
+ resultSet.beforeFirst();
+ sender().tell(new CursorReset(), self());
+ } catch (SQLException e) {
+ LOG.error("Failed to reset the cursor", e);
+ sender().tell(new FetchFailed("Failed to reset the cursor", e), self());
+ cleanUpResources();
+ }
+ }
+
+ private void sendKeepAlive() {
+ LOG.debug("Sending a keep alive to {}", parent);
+ parent.tell(new KeepAlive(), self());
+ }
+
+ private void getNext() {
+ List<Row> rows = Lists.newArrayList();
+ if (!metaDataFetched) {
+ try {
+ initialize();
+ } catch (SQLException ex) {
+ LOG.error("Failed to fetch metadata for the ResultSet", ex);
+ sender().tell(new FetchFailed("Failed to get metadata for ResultSet", ex), self());
+ cleanUpResources();
+ }
+ }
+ int index = 0;
+ try {
+ while (resultSet.next() && index < batchSize) {
+ index++;
+ rows.add(getRowFromResultSet(resultSet));
+ }
+
+ if (index == 0) {
+ // We have hit end of resultSet
+ sender().tell(new NoMoreItems(), self());
+ if(!async) {
+ cleanUpResources();
+ }
+ } else {
+ Result result = new Result(rows, columnDescriptions);
+ sender().tell(result, self());
+ }
+
+ } catch (SQLException ex) {
+ LOG.error("Failed to fetch next batch for the Resultset", ex);
+ sender().tell(new FetchFailed("Failed to fetch next batch for the Resultset", ex), self());
+ cleanUpResources();
+ }
+ }
+
+ private void cleanUpResources() {
+ parent.tell(new CleanUp(), self());
+ }
+
+ private Row getRowFromResultSet(ResultSet resultSet) throws SQLException {
+ Object[] values = new Object[columnCount];
+ for (int i = 0; i < columnCount; i++) {
+ values[i] = resultSet.getObject(i + 1);
+ }
+ return new Row(values);
+ }
+
+ private void initialize() throws SQLException {
+ metaDataFetched = true;
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ columnCount = metaData.getColumnCount();
+ columnDescriptions = Lists.newArrayList();
+ for (int i = 1; i <= columnCount; i++) {
+ String columnName = metaData.getColumnName(i);
+ String typeName = metaData.getColumnTypeName(i);
+ ColumnDescription description = new ColumnDescriptionShort(columnName, typeName, i);
+ columnDescriptions.add(description);
+ }
+ }
+}