You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2016/06/27 23:36:45 UTC
[19/34] ambari git commit: AMBARI-17355 & AMBARI-17354: POC: FE & BE
changes for first class support for Yarn hosted services
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
new file mode 100644
index 0000000..ac62cf7
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.ConnectionDelegate;
+import org.apache.ambari.view.hive2.actor.message.AdvanceCursor;
+import org.apache.ambari.view.hive2.actor.message.AsyncJob;
+import org.apache.ambari.view.hive2.actor.message.Connect;
+import org.apache.ambari.view.hive2.actor.message.ExecuteJob;
+import org.apache.ambari.view.hive2.actor.message.ExecuteQuery;
+import org.apache.ambari.view.hive2.actor.message.FetchError;
+import org.apache.ambari.view.hive2.actor.message.FetchResult;
+import org.apache.ambari.view.hive2.actor.message.HiveJob;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.JobRejected;
+import org.apache.ambari.view.hive2.actor.message.RegisterActor;
+import org.apache.ambari.view.hive2.actor.message.ResultReady;
+import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector;
+import org.apache.ambari.view.hive2.internal.ContextSupplier;
+import org.apache.ambari.view.hive2.internal.Either;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.hive2.utils.LoggingOutputStream;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.collections4.map.HashedMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Router actor to control the operations. This delegates the operations to underlying child actors and
+ * store the state for them.
+ */
+public class OperationController extends HiveActor {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private final ActorSystem system;
+ private final ActorRef deathWatch;
+ private final ContextSupplier<ConnectionDelegate> connectionSupplier;
+ private final ContextSupplier<Storage> storageSupplier;
+ private final ContextSupplier<Optional<HdfsApi>> hdfsApiSupplier;
+
+ /**
+ * Store the connection per user which are currently not working
+ */
+ private final Map<String, Queue<ActorRef>> asyncAvailableConnections;
+
+ /**
+ * Store the connection per user which are currently not working
+ */
+ private final Map<String, Queue<ActorRef>> syncAvailableConnections;
+
+
+ /**
+ * Store the connection per user/per job which are currently working.
+ */
+ private final Map<String, Map<String, ActorRefResultContainer>> asyncBusyConnections;
+
+ /**
+ * Store the connection per user which will be used to execute sync jobs
+ * like fetching databases, tables etc.
+ */
+ private final Map<String, Set<ActorRef>> syncBusyConnections;
+
+ public OperationController(ActorSystem system,
+ ActorRef deathWatch,
+ ContextSupplier<ConnectionDelegate> connectionSupplier,
+ ContextSupplier<Storage> storageSupplier,
+ ContextSupplier<Optional<HdfsApi>> hdfsApiSupplier) {
+ this.system = system;
+ this.deathWatch = deathWatch;
+ this.connectionSupplier = connectionSupplier;
+ this.storageSupplier = storageSupplier;
+ this.hdfsApiSupplier = hdfsApiSupplier;
+ this.asyncAvailableConnections = new HashMap<>();
+ this.syncAvailableConnections = new HashMap<>();
+ this.asyncBusyConnections = new HashedMap<>();
+ this.syncBusyConnections = new HashMap<>();
+ }
+
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+ Object message = hiveMessage.getMessage();
+
+ if (message instanceof ExecuteJob) {
+ ExecuteJob job = (ExecuteJob) message;
+ if (job.getJob().getType() == HiveJob.Type.ASYNC) {
+ sendJob(job.getConnect(), (AsyncJob) job.getJob());
+ } else if (job.getJob().getType() == HiveJob.Type.SYNC) {
+ sendSyncJob(job.getConnect(), job.getJob());
+ }
+ }
+
+ if (message instanceof ResultReady) {
+ updateResultContainer((ResultReady) message);
+ }
+
+ if(message instanceof AsyncExecutionFailed){
+ updateResultContainerWithError((AsyncExecutionFailed) message);
+ }
+
+ if (message instanceof GetResultHolder) {
+ getResultHolder((GetResultHolder) message);
+ }
+
+ if (message instanceof FetchResult) {
+ fetchResultActorRef((FetchResult) message);
+ }
+
+ if (message instanceof FetchError) {
+ fetchError((FetchError) message);
+ }
+
+ if (message instanceof FreeConnector) {
+ freeConnector((FreeConnector) message);
+ }
+
+ if (message instanceof DestroyConnector) {
+ destroyConnector((DestroyConnector) message);
+ }
+ }
+
+ private void fetchError(FetchError message) {
+ String jobId = message.getJobId();
+ String username = message.getUsername();
+ ActorRefResultContainer container = asyncBusyConnections.get(username).get(jobId);
+ if(container.hasError){
+ sender().tell(Optional.of(container.error), self());
+ return;
+ }
+ sender().tell(Optional.absent(), self());
+ }
+
+ private void updateResultContainerWithError(AsyncExecutionFailed message) {
+ String userName = message.getUsername();
+ String jobId = message.getJobId();
+ ActorRefResultContainer container = asyncBusyConnections.get(userName).get(jobId);
+ container.hasError = true;
+ container.error = message;
+ }
+
+ private void getResultHolder(GetResultHolder message) {
+ String userName = message.getUserName();
+ String jobId = message.getJobId();
+ if(asyncBusyConnections.containsKey(userName) && asyncBusyConnections.get(userName).containsKey(jobId))
+ sender().tell(asyncBusyConnections.get(userName).get(jobId).result, self());
+ else {
+ Either<ActorRef, AsyncExecutionFailed> right = Either.right(new AsyncExecutionFailed(message.getJobId(),userName, "Could not find the job, maybe the pool expired"));
+ sender().tell(right, self());
+ }
+ }
+
+ private void updateResultContainer(ResultReady message) {
+ // set up result actor in container
+ String jobId = message.getJobId();
+ String username = message.getUsername();
+ Either<ActorRef, ActorRef> result = message.getResult();
+ asyncBusyConnections.get(username).get(jobId).result = result;
+ // start processing
+ if(message.getResult().isRight()){
+ // Query with no result sets to be returned
+ // execute right away
+ result.getRight().tell(new ExecuteQuery(),self());
+ }
+ if(result.isLeft()){
+ // There is a result set to be processed
+ result.getLeft().tell(new AdvanceCursor(message.getJobId()),self());
+ }
+
+ }
+
+ private void fetchResultActorRef(FetchResult message) {
+ //Gets an Either actorRef,result implementation
+ // and send back to the caller
+ String username = message.getUsername();
+ String jobId = message.getJobId();
+ ActorRefResultContainer container = asyncBusyConnections.get(username).get(jobId);
+ if(container.hasError){
+ sender().tell(container.error,self());
+ return;
+ }
+ Either<ActorRef, ActorRef> result = container.result;
+ sender().tell(result,self());
+ }
+
+ private void sendJob(Connect connect, AsyncJob job) {
+ String username = job.getUsername();
+ String jobId = job.getJobId();
+ ActorRef subActor = null;
+ // Check if there is available actors to process this
+ subActor = getActorRefFromAsyncPool(username);
+ ViewContext viewContext = job.getViewContext();
+ if (subActor == null) {
+ Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(viewContext);
+ if (!hdfsApiOptional.isPresent()) {
+ sender().tell(new JobRejected(username, jobId, "Failed to connect to Hive."), self());
+ return;
+ }
+ HdfsApi hdfsApi = hdfsApiOptional.get();
+
+ subActor = system.actorOf(
+ Props.create(AsyncJdbcConnector.class, viewContext, hdfsApi, system, self(),
+ deathWatch, connectionSupplier.get(viewContext),
+ storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
+ "jobId:" + jobId + ":-asyncjdbcConnector");
+ deathWatch.tell(new RegisterActor(subActor),self());
+
+ }
+
+ if (asyncBusyConnections.containsKey(username)) {
+ Map<String, ActorRefResultContainer> actors = asyncBusyConnections.get(username);
+ if (!actors.containsKey(jobId)) {
+ actors.put(jobId, new ActorRefResultContainer(subActor));
+ } else {
+ // Reject this as with the same jobId one connection is already in progress.
+ sender().tell(new JobRejected(username, jobId, "Existing job in progress with same jobId."), ActorRef.noSender());
+ }
+ } else {
+ Map<String, ActorRefResultContainer> actors = new HashMap<>();
+ actors.put(jobId, new ActorRefResultContainer(subActor));
+ asyncBusyConnections.put(username, actors);
+ }
+
+ // set up the connect with ExecuteJob id for terminations
+ subActor.tell(connect, self());
+ subActor.tell(job, self());
+
+ }
+
+ private ActorRef getActorRefFromSyncPool(String username) {
+ return getActorRefFromPool(syncAvailableConnections, username);
+ }
+
+ private ActorRef getActorRefFromAsyncPool(String username) {
+ return getActorRefFromPool(asyncAvailableConnections, username);
+ }
+
+ private ActorRef getActorRefFromPool(Map<String, Queue<ActorRef>> pool, String username) {
+ ActorRef subActor = null;
+ if (pool.containsKey(username)) {
+ Queue<ActorRef> availableActors = pool.get(username);
+ if (availableActors.size() != 0) {
+ subActor = availableActors.poll();
+ }
+ } else {
+ pool.put(username, new LinkedList<ActorRef>());
+ }
+ return subActor;
+ }
+
+ private void sendSyncJob(Connect connect, HiveJob job) {
+ String username = job.getUsername();
+ ActorRef subActor = null;
+ // Check if there is available actors to process this
+ subActor = getActorRefFromSyncPool(username);
+ ViewContext viewContext = job.getViewContext();
+
+ if (subActor == null) {
+ Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(viewContext);
+ if(!hdfsApiOptional.isPresent()){
+ sender().tell(new JobRejected(username, ExecuteJob.SYNC_JOB_MARKER, "Failed to connect to HDFS."), ActorRef.noSender());
+ return;
+ }
+ HdfsApi hdfsApi = hdfsApiOptional.get();
+
+ subActor = system.actorOf(
+ Props.create(SyncJdbcConnector.class, viewContext, hdfsApi, system, self(),
+ deathWatch, connectionSupplier.get(viewContext),
+ storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
+ UUID.randomUUID().toString() + ":SyncjdbcConnector" );
+ deathWatch.tell(new RegisterActor(subActor),self());
+
+ }
+
+ if (syncBusyConnections.containsKey(username)) {
+ Set<ActorRef> actors = syncBusyConnections.get(username);
+ actors.add(subActor);
+ } else {
+ LinkedHashSet<ActorRef> actors = new LinkedHashSet<>();
+ actors.add(subActor);
+ syncBusyConnections.put(username, actors);
+ }
+
+ // Termination requires that the ref is known in case of sync jobs
+ subActor.tell(connect, self());
+ subActor.tell(job, sender());
+ }
+
+
+ private void destroyConnector(DestroyConnector message) {
+ ActorRef sender = getSender();
+ if (message.isForAsync()) {
+ removeFromAsyncBusyPool(message.getUsername(), message.getJobId());
+ removeFromASyncAvailable(message.getUsername(), sender);
+ } else {
+ removeFromSyncBusyPool(message.getUsername(), sender);
+ removeFromSyncAvailable(message.getUsername(), sender);
+ }
+ logMaps();
+ }
+
+ private void freeConnector(FreeConnector message) {
+ LOG.info("About to free connector for job {} and user {}",message.getJobId(),message.getUsername());
+ ActorRef sender = getSender();
+ if (message.isForAsync()) {
+ Optional<ActorRef> refOptional = removeFromAsyncBusyPool(message.getUsername(), message.getJobId());
+ if (refOptional.isPresent()) {
+ addToAsyncAvailable(message.getUsername(), refOptional.get());
+ }
+ return;
+ }
+ // Was a sync job, remove from sync pool
+ Optional<ActorRef> refOptional = removeFromSyncBusyPool(message.getUsername(), sender);
+ if (refOptional.isPresent()) {
+ addToSyncAvailable(message.getUsername(), refOptional.get());
+ }
+
+
+ logMaps();
+
+ }
+
+ private void logMaps() {
+ LOG.info("Pool status");
+ LoggingOutputStream out = new LoggingOutputStream(LOG, LoggingOutputStream.LogLevel.INFO);
+ MapUtils.debugPrint(new PrintStream(out), "Busy Async connections", asyncBusyConnections);
+ MapUtils.debugPrint(new PrintStream(out), "Available Async connections", asyncAvailableConnections);
+ MapUtils.debugPrint(new PrintStream(out), "Busy Sync connections", syncBusyConnections);
+ MapUtils.debugPrint(new PrintStream(out), "Available Sync connections", syncAvailableConnections);
+ try {
+ out.close();
+ } catch (IOException e) {
+ LOG.warn("Cannot close Logging output stream, this may lead to leaks");
+ }
+ }
+
+ private Optional<ActorRef> removeFromSyncBusyPool(String userName, ActorRef refToFree) {
+ if (syncBusyConnections.containsKey(userName)) {
+ Set<ActorRef> actorRefs = syncBusyConnections.get(userName);
+ actorRefs.remove(refToFree);
+ }
+ return Optional.of(refToFree);
+ }
+
+ private Optional<ActorRef> removeFromAsyncBusyPool(String username, String jobId) {
+ ActorRef ref = null;
+ if (asyncBusyConnections.containsKey(username)) {
+ Map<String, ActorRefResultContainer> actors = asyncBusyConnections.get(username);
+ if (actors.containsKey(jobId)) {
+ ref = actors.get(jobId).actorRef;
+ actors.remove(jobId);
+ }
+ }
+ return Optional.fromNullable(ref);
+ }
+
+ private void addToAsyncAvailable(String username, ActorRef actor) {
+ addToAvailable(asyncAvailableConnections, username, actor);
+ }
+
+ private void addToSyncAvailable(String username, ActorRef actor) {
+ addToAvailable(syncAvailableConnections, username, actor);
+ }
+
+ private void addToAvailable(Map<String, Queue<ActorRef>> pool, String username, ActorRef actor) {
+ if (!pool.containsKey(username)) {
+ pool.put(username, new LinkedList<ActorRef>());
+ }
+
+ Queue<ActorRef> availableActors = pool.get(username);
+ availableActors.add(actor);
+ }
+
+ private void removeFromASyncAvailable(String username, ActorRef sender) {
+ removeFromAvailable(asyncAvailableConnections, username, sender);
+ }
+
+ private void removeFromSyncAvailable(String username, ActorRef sender) {
+ removeFromAvailable(syncAvailableConnections, username, sender);
+ }
+
+ private void removeFromAvailable(Map<String, Queue<ActorRef>> pool, String username, ActorRef sender) {
+ if (!pool.containsKey(username)) {
+ return;
+ }
+ Queue<ActorRef> actors = pool.get(username);
+ actors.remove(sender);
+ }
+
+ private static class ActorRefResultContainer {
+
+ ActorRef actorRef;
+ boolean hasError = false;
+ Either<ActorRef, ActorRef> result = Either.none();
+ AsyncExecutionFailed error;
+
+ public ActorRefResultContainer(ActorRef actorRef) {
+ this.actorRef = actorRef;
+ }
+ }
+
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java
new file mode 100644
index 0000000..e883768
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import com.google.common.collect.Lists;
+import org.apache.ambari.view.hive2.actor.message.CursorReset;
+import org.apache.ambari.view.hive2.actor.message.JobExecutionCompleted;
+import org.apache.ambari.view.hive2.actor.message.ResetCursor;
+import org.apache.ambari.view.hive2.client.ColumnDescription;
+import org.apache.ambari.view.hive2.client.ColumnDescriptionShort;
+import org.apache.ambari.view.hive2.client.Row;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
+import org.apache.ambari.view.hive2.actor.message.AdvanceCursor;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.job.FetchFailed;
+import org.apache.ambari.view.hive2.actor.message.job.Next;
+import org.apache.ambari.view.hive2.actor.message.job.NoMoreItems;
+import org.apache.ambari.view.hive2.actor.message.job.Result;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.KeepAlive;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
+public class ResultSetIterator extends HiveActor {
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private static final int DEFAULT_BATCH_SIZE = 100;
+ public static final String NULL = "NULL";
+
+ private final ActorRef parent;
+ private final ResultSet resultSet;
+ private final int batchSize;
+
+ private List<ColumnDescription> columnDescriptions;
+ private int columnCount;
+ private Storage storage;
+ boolean async = false;
+ private boolean jobCompleteMessageSent = false;
+
+
+ private boolean metaDataFetched = false;
+
+ public ResultSetIterator(ActorRef parent, ResultSet resultSet, int batchSize) {
+ this.parent = parent;
+ this.resultSet = resultSet;
+ this.batchSize = batchSize;
+ }
+
+
+ public ResultSetIterator(ActorRef parent, ResultSet resultSet, Storage storage) {
+ this(parent, resultSet);
+ this.storage = storage;
+ this.async = true;
+ }
+
+ public ResultSetIterator(ActorRef parent, ResultSet resultSet) {
+ this(parent, resultSet, DEFAULT_BATCH_SIZE);
+ }
+
+ @Override
+ void handleMessage(HiveMessage hiveMessage) {
+ LOG.info("Result set Iterator wil handle message {}", hiveMessage);
+ sendKeepAlive();
+ Object message = hiveMessage.getMessage();
+ if (message instanceof Next) {
+ getNext();
+ }
+ if (message instanceof ResetCursor) {
+ resetResultSet();
+ }
+
+ if (message instanceof KeepAlive) {
+ sendKeepAlive();
+ }
+ if (message instanceof AdvanceCursor) {
+ AdvanceCursor moveCursor = (AdvanceCursor) message;
+ advanceCursor(moveCursor);
+ }
+
+ }
+
+ private void advanceCursor(AdvanceCursor moveCursor) {
+ String jobid = moveCursor.getJob();
+ try {
+ // Block here so that we can update the job status
+ resultSet.next();
+ // Resetting the resultset as it needs to fetch from the beginning when the result is asked for.
+ resultSet.beforeFirst();
+ LOG.info("Job execution successful. Setting status in db.");
+ updateJobStatus(jobid, Job.JOB_STATE_FINISHED);
+ sendJobCompleteMessageIfNotDone();
+ } catch (SQLException e) {
+ LOG.error("Failed to reset the cursor after advancing. Setting error state in db.", e);
+ updateJobStatus(jobid, Job.JOB_STATE_ERROR);
+ sender().tell(new FetchFailed("Failed to reset the cursor after advancing", e), self());
+ cleanUpResources();
+ }
+ }
+
+ private void updateJobStatus(String jobid, String status) {
+ try {
+ JobImpl job = storage.load(JobImpl.class, jobid);
+ job.setStatus(status);
+ storage.store(JobImpl.class, job);
+ } catch (ItemNotFound itemNotFound) {
+ // Cannot do anything
+ }
+ }
+
+ private void resetResultSet() {
+ try {
+ resultSet.beforeFirst();
+ sender().tell(new CursorReset(), self());
+ } catch (SQLException e) {
+ LOG.error("Failed to reset the cursor", e);
+ sender().tell(new FetchFailed("Failed to reset the cursor", e), self());
+ cleanUpResources();
+ }
+ }
+
+ private void sendKeepAlive() {
+ LOG.debug("Sending a keep alive to {}", parent);
+ parent.tell(new KeepAlive(), self());
+ }
+
+ private void getNext() {
+ List<Row> rows = Lists.newArrayList();
+ if (!metaDataFetched) {
+ try {
+ initialize();
+ } catch (SQLException ex) {
+ LOG.error("Failed to fetch metadata for the ResultSet", ex);
+ sender().tell(new FetchFailed("Failed to get metadata for ResultSet", ex), self());
+ cleanUpResources();
+ }
+ }
+ int index = 0;
+ try {
+ while (resultSet.next() && index < batchSize) {
+ index++;
+ rows.add(getRowFromResultSet(resultSet));
+ sendJobCompleteMessageIfNotDone();
+ }
+
+ if (index == 0) {
+ // We have hit end of resultSet
+ sender().tell(new NoMoreItems(), self());
+ if(!async) {
+ cleanUpResources();
+ }
+ } else {
+ Result result = new Result(rows, columnDescriptions);
+ sender().tell(result, self());
+ }
+
+ } catch (SQLException ex) {
+ LOG.error("Failed to fetch next batch for the Resultset", ex);
+ sender().tell(new FetchFailed("Failed to fetch next batch for the Resultset", ex), self());
+ cleanUpResources();
+ }
+ }
+
+ private void sendJobCompleteMessageIfNotDone() {
+ if (!jobCompleteMessageSent) {
+ jobCompleteMessageSent = true;
+ parent.tell(new JobExecutionCompleted(), self());
+ }
+ }
+
+ private void cleanUpResources() {
+ parent.tell(new CleanUp(), self());
+ }
+
+ private Row getRowFromResultSet(ResultSet resultSet) throws SQLException {
+ Object[] values = new Object[columnCount];
+ for (int i = 0; i < columnCount; i++) {
+ values[i] = resultSet.getObject(i + 1);
+ }
+ return new Row(values);
+ }
+
+ private void initialize() throws SQLException {
+ metaDataFetched = true;
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ columnCount = metaData.getColumnCount();
+ columnDescriptions = Lists.newArrayList();
+ for (int i = 1; i <= columnCount; i++) {
+ String columnName = metaData.getColumnName(i);
+ String typeName = metaData.getColumnTypeName(i);
+ ColumnDescription description = new ColumnDescriptionShort(columnName, typeName, i);
+ columnDescriptions.add(description);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java
new file mode 100644
index 0000000..a0b6eae
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.actor.message.RegisterActor;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.hive2.ConnectionDelegate;
+import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.SyncJob;
+import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed;
+import org.apache.ambari.view.hive2.actor.message.job.NoResult;
+import org.apache.ambari.view.hive2.actor.message.job.ResultSetHolder;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.hive.jdbc.HiveConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class SyncJdbcConnector extends JdbcConnector {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+ private ActorRef resultSetActor = null;
+
+ public SyncJdbcConnector(ViewContext viewContext, HdfsApi hdfsApi, ActorSystem system, ActorRef parent,ActorRef deathWatch, ConnectionDelegate connectionDelegate, Storage storage) {
+ super(viewContext, hdfsApi, system, parent,deathWatch, connectionDelegate, storage);
+ }
+
+ @Override
+ protected void handleJobMessage(HiveMessage message) {
+ Object job = message.getMessage();
+ if(job instanceof SyncJob) {
+ execute((SyncJob) job);
+ } else if (job instanceof GetColumnMetadataJob) {
+ getColumnMetaData((GetColumnMetadataJob) job);
+ }
+ }
+
+ @Override
+ protected boolean isAsync() {
+ return false;
+ }
+
+ @Override
+ protected void cleanUpChildren() {
+ if(resultSetActor != null && !resultSetActor.isTerminated()) {
+ LOG.debug("Sending poison pill to log aggregator");
+ resultSetActor.tell(PoisonPill.getInstance(), self());
+ }
+ }
+
+ @Override
+ protected void notifyFailure() {
+ sender().tell(new ExecutionFailed("Cannot connect to hive"), ActorRef.noSender());
+ }
+
+ protected void execute(final SyncJob job) {
+ this.executing = true;
+ executeJob(new Operation<SyncJob>() {
+ @Override
+ SyncJob getJob() {
+ return job;
+ }
+
+ @Override
+ Optional<ResultSet> call(HiveConnection connection) throws SQLException {
+ return connectionDelegate.executeSync(connection, job);
+ }
+
+ @Override
+ String notConnectedErrorMessage() {
+ return "Cannot execute sync job for user: " + job.getUsername() + ". Not connected to Hive";
+ }
+
+ @Override
+ String executionFailedErrorMessage() {
+ return "Failed to execute Jdbc Statement";
+ }
+ });
+ }
+
+
+ private void getColumnMetaData(final GetColumnMetadataJob job) {
+ executeJob(new Operation<GetColumnMetadataJob>() {
+
+ @Override
+ GetColumnMetadataJob getJob() {
+ return job;
+ }
+
+ @Override
+ Optional<ResultSet> call(HiveConnection connection) throws SQLException {
+ return connectionDelegate.getColumnMetadata(connection, job);
+ }
+
+ @Override
+ String notConnectedErrorMessage() {
+ return String.format("Cannot get column metadata for user: %s, schema: %s, table: %s, column: %s" +
+ ". Not connected to Hive", job.getUsername(), job.getSchemaPattern(), job.getTablePattern(),
+ job.getColumnPattern());
+ }
+
+ @Override
+ String executionFailedErrorMessage() {
+ return "Failed to execute Jdbc Statement";
+ }
+ });
+ }
+
+ private void executeJob(Operation operation) {
+ ActorRef sender = this.getSender();
+ String errorMessage = operation.notConnectedErrorMessage();
+ if (connectable == null) {
+ sender.tell(new ExecutionFailed(errorMessage), ActorRef.noSender());
+ cleanUp();
+ return;
+ }
+
+ Optional<HiveConnection> connectionOptional = connectable.getConnection();
+ if (!connectionOptional.isPresent()) {
+ sender.tell(new ExecutionFailed(errorMessage), ActorRef.noSender());
+ cleanUp();
+ return;
+ }
+
+ try {
+ Optional<ResultSet> resultSetOptional = operation.call(connectionOptional.get());
+ if(resultSetOptional.isPresent()) {
+ ActorRef resultSetActor = getContext().actorOf(Props.create(ResultSetIterator.class, self(),
+ resultSetOptional.get()).withDispatcher("akka.actor.result-dispatcher"));
+ deathWatch.tell(new RegisterActor(resultSetActor),self());
+ sender.tell(new ResultSetHolder(resultSetActor), self());
+ } else {
+ sender.tell(new NoResult(), self());
+ cleanUp();
+ }
+ } catch (SQLException e) {
+ LOG.error(operation.executionFailedErrorMessage(), e);
+ sender.tell(new ExecutionFailed(operation.executionFailedErrorMessage(), e), self());
+ cleanUp();
+ }
+ }
+
+ private abstract class Operation<T> {
+ abstract T getJob();
+ abstract Optional<ResultSet> call(HiveConnection connection) throws SQLException;
+ abstract String notConnectedErrorMessage();
+ abstract String executionFailedErrorMessage();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java
new file mode 100644
index 0000000..0f918ad
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.UntypedActor;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+
+/**
+ * Queries YARN/ATS time to time to fetch the status of the ExecuteJob and updates database
+ */
+public class YarnAtsParser extends HiveActor {
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java
new file mode 100644
index 0000000..c3e6c04
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class AdvanceCursor {
+
+ private String job;
+
+ public AdvanceCursor(String job) {
+ this.job = job;
+ }
+
+ public String getJob() {
+ return job;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java
new file mode 100644
index 0000000..fd1f26f
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import com.google.common.base.Optional;
+
+import java.sql.ResultSet;
+
+public class AssignResultSet {
+
+ private Optional<ResultSet> resultSet;
+
+
+ public AssignResultSet(Optional<ResultSet> resultSet) {
+ this.resultSet = resultSet;
+
+ }
+
+
+ public ResultSet getResultSet() {
+ return resultSet.orNull();
+ }
+
+
+ @Override
+ public String toString() {
+ return "ExtractResultSet{" +
+ "resultSet=" + resultSet +
+ '}';
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignStatement.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignStatement.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignStatement.java
new file mode 100644
index 0000000..85273ab
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignStatement.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import com.google.common.base.Optional;
+
+import java.sql.Statement;
+
+public class AssignStatement {
+
+ private Statement resultSet;
+
+ public AssignStatement(Statement statement) {
+ this.resultSet = statement;
+ }
+
+
+ public Statement getStatement() {
+ return resultSet;
+ }
+
+ @Override
+ public String toString() {
+ return "AssignStatement{" +
+ "resultSet=" + resultSet +
+ '}';
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AsyncJob.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AsyncJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AsyncJob.java
new file mode 100644
index 0000000..6dfd709
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AsyncJob.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import org.apache.ambari.view.ViewContext;
+
+/**
+ * Message to be sent when a statement has to be executed
+ */
+public class AsyncJob extends DDLJob {
+ private final String jobId;
+ private final String logFile;
+
+ public AsyncJob(String jobId, String username, String[] statements, String logFile,ViewContext viewContext) {
+ super(Type.ASYNC, statements, username,viewContext);
+ this.jobId = jobId;
+ this.logFile = logFile;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getLogFile() {
+ return logFile;
+ }
+
+
+ @Override
+ public String toString() {
+ return "AsyncJob{" +
+ "jobId='" + jobId + '\'' +
+ ", logFile='" + logFile + '\'' +
+ "} " + super.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/Connect.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/Connect.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/Connect.java
new file mode 100644
index 0000000..b859ac1
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/Connect.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import org.apache.ambari.view.hive2.internal.Connectable;
+import org.apache.ambari.view.hive2.internal.HiveConnectionWrapper;
+
+/**
+ * Connect message to be sent to the Connection Actor with the connection parameters
+ */
+public class Connect {
+
+ private final String username;
+ private final String password;
+ private final String jdbcUrl;
+
+
+ public Connect(String username, String password, String jdbcUrl) {
+ this.username = username;
+ this.password = password;
+ this.jdbcUrl = jdbcUrl;
+ }
+
+ public Connectable getConnectable(){
+ return new HiveConnectionWrapper(getJdbcUrl(),username,password);
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public String getJdbcUrl() {
+ return jdbcUrl;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/CursorReset.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/CursorReset.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/CursorReset.java
new file mode 100644
index 0000000..d805754
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/CursorReset.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class CursorReset {
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/DDLJob.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/DDLJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/DDLJob.java
new file mode 100644
index 0000000..7e19a77
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/DDLJob.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.ambari.view.ViewContext;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+
+public class DDLJob extends HiveJob {
+
+ public static final String SEMICOLON = ";";
+ private String[] statements;
+
+ public DDLJob(Type type, String[] statements, String username, ViewContext viewContext) {
+ super(type, username, viewContext);
+ this.statements = new String[statements.length];
+ for (int i = 0; i < statements.length; i++) {
+ this.statements[i] = clean(statements[i]);
+
+ }
+
+ }
+
+ private String clean(String statement) {
+ return StringUtils.trim(statement);
+ }
+
+ public Collection<String> getStatements() {
+ return Arrays.asList(statements);
+ }
+
+ /**
+ * Get the statements to be executed synchronously
+ *
+ * @return
+ */
+ public Collection<String> getSyncStatements() {
+ if (!(statements.length > 1))
+ return Collections.emptyList();
+ else
+ return ImmutableList.copyOf(Arrays.copyOfRange(statements, 0, statements.length - 1));
+ }
+
+ /**
+ * Get the statement to be executed asynchronously
+ *
+ * @return async statement
+ */
+ public String getAsyncStatement() {
+ return statements[statements.length - 1];
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteJob.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteJob.java
new file mode 100644
index 0000000..d4d8a1b
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteJob.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class ExecuteJob {
+ public final static String SYNC_JOB_MARKER = "SYNC";
+ private final Connect connect;
+ private final HiveJob job;
+
+ public ExecuteJob(Connect connect, HiveJob job) {
+ this.connect = connect;
+ this.job = job;
+ }
+
+ public Connect getConnect() {
+ return connect;
+ }
+
+ public HiveJob getJob() {
+ return job;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteQuery.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteQuery.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteQuery.java
new file mode 100644
index 0000000..b3d1599
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteQuery.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class ExecuteQuery {
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchError.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchError.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchError.java
new file mode 100644
index 0000000..c78dc43
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchError.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+/**
+ *
+ * Fetch the result for
+ *
+ */
+public class FetchError {
+ private final String jobId;
+ private final String username;
+
+ public FetchError(String jobId, String username) {
+ this.jobId = jobId;
+ this.username = username;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchResult.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchResult.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchResult.java
new file mode 100644
index 0000000..6b2ac42
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+/**
+ *
+ * Fetch the result for
+ *
+ */
+public class FetchResult {
+ private final String jobId;
+ private final String username;
+
+ public FetchResult(String jobId, String username) {
+ this.jobId = jobId;
+ this.username = username;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java
new file mode 100644
index 0000000..defa08c
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import org.apache.ambari.view.ViewContext;
+
+public class GetColumnMetadataJob extends HiveJob {
+ private final String schemaPattern;
+ private final String tablePattern;
+ private final String columnPattern;
+ public GetColumnMetadataJob(String username, ViewContext viewContext,
+ String schemaPattern, String tablePattern, String columnPattern) {
+ super(Type.SYNC, username, viewContext);
+ this.schemaPattern = schemaPattern;
+ this.tablePattern = tablePattern;
+ this.columnPattern = columnPattern;
+ }
+
+ public GetColumnMetadataJob(String username, ViewContext viewContext,
+ String tablePattern, String columnPattern) {
+ this(username, viewContext, "*", tablePattern, columnPattern);
+ }
+
+ public GetColumnMetadataJob(String username, ViewContext viewContext,
+ String columnPattern) {
+ this(username, viewContext, "*", "*", columnPattern);
+ }
+
+ public GetColumnMetadataJob(String username, ViewContext viewContext) {
+ this(username, viewContext, "*", "*", "*");
+ }
+
+ public String getSchemaPattern() {
+ return schemaPattern;
+ }
+
+ public String getTablePattern() {
+ return tablePattern;
+ }
+
+ public String getColumnPattern() {
+ return columnPattern;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetMoreLogs.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetMoreLogs.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetMoreLogs.java
new file mode 100644
index 0000000..6e084ee
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetMoreLogs.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+
+public class GetMoreLogs {}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java
new file mode 100644
index 0000000..ee3c1be
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.ambari.view.ViewContext;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+public abstract class HiveJob {
+
+ private final String username;
+ private final Type type;
+ private final ViewContext viewContext;
+
+ public HiveJob(Type type, String username,ViewContext viewContext) {
+ this.type = type;
+ this.username = username;
+ this.viewContext = viewContext;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+
+
+
+ public Type getType() {
+ return type;
+ }
+
+
+
+ public ViewContext getViewContext() {
+ return viewContext;
+ }
+
+
+ public enum Type {
+ SYNC,
+ ASYNC
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveMessage.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveMessage.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveMessage.java
new file mode 100644
index 0000000..28533c7
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+
+import java.util.UUID;
+
+/**
+ * Message wrapper, Each message has a unique ID
+ */
+public class HiveMessage {
+
+ private String id = UUID.randomUUID().toString();
+
+ private Object message;
+
+ public HiveMessage(Object message) {
+ this.message = message;
+ }
+
+
+ public Object getMessage() {
+ return message;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public String toString() {
+ return "HiveMessage{" +
+ "message=" + message +
+ ", id='" + id + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobExecutionCompleted.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobExecutionCompleted.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobExecutionCompleted.java
new file mode 100644
index 0000000..52ba3f5
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobExecutionCompleted.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class JobExecutionCompleted {}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobRejected.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobRejected.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobRejected.java
new file mode 100644
index 0000000..4f9aab8
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobRejected.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class JobRejected {
+
+ private final String username;
+ private final String jobId;
+ private final String message;
+
+ public JobRejected(String username, String jobId, String message) {
+ this.username = username;
+ this.jobId = jobId;
+ this.message = message;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobSubmitted.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobSubmitted.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobSubmitted.java
new file mode 100644
index 0000000..bc8df2a
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobSubmitted.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class JobSubmitted {
+
+ private final String username;
+ private final String jobId;
+
+ public JobSubmitted(String username, String jobId) {
+ this.username = username;
+ this.jobId = jobId;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/LogAggregationFinished.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/LogAggregationFinished.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/LogAggregationFinished.java
new file mode 100644
index 0000000..bfe37b5
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/LogAggregationFinished.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class LogAggregationFinished {}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/RegisterActor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/RegisterActor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/RegisterActor.java
new file mode 100644
index 0000000..9988252
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/RegisterActor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import akka.actor.ActorRef;
+
+public class RegisterActor {
+
+ private ActorRef actorRef;
+
+ public RegisterActor(ActorRef actorRef) {
+ this.actorRef = actorRef;
+ }
+
+ public ActorRef getActorRef() {
+ return actorRef;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResetCursor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResetCursor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResetCursor.java
new file mode 100644
index 0000000..53276d3
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResetCursor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class ResetCursor {
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultReady.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultReady.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultReady.java
new file mode 100644
index 0000000..65de920
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultReady.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import akka.actor.ActorRef;
+import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
+import org.apache.ambari.view.hive2.internal.Either;
+
+/**
+ *
+ * Fetch the result for
+ *
+ */
+public class ResultReady extends FetchResult {
+ private Either<ActorRef, ActorRef> result;
+
+
+ public ResultReady(String jobId, String username, Either<ActorRef, ActorRef> result) {
+ super(jobId, username);
+ this.result = result;
+ }
+
+ public Either<ActorRef, ActorRef> getResult() {
+ return result;
+ }
+
+ public void setResult(Either<ActorRef, ActorRef> result) {
+ this.result = result;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
new file mode 100644
index 0000000..3bae12a
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class StartLogAggregation {}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SyncJob.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SyncJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SyncJob.java
new file mode 100644
index 0000000..7aece31
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SyncJob.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import org.apache.ambari.view.ViewContext;
+
+public class SyncJob extends DDLJob {
+ public SyncJob(String username, String[] statements,ViewContext viewContext) {
+ super(Type.SYNC, statements, username,viewContext);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/AsyncExecutionFailed.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/AsyncExecutionFailed.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/AsyncExecutionFailed.java
new file mode 100644
index 0000000..514d9cd
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/AsyncExecutionFailed.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message.job;
+
+public class AsyncExecutionFailed extends ExecutionFailed {
+ private final String jobId;
+ private final String username;
+
+ public AsyncExecutionFailed(String jobId,String username, String message, Throwable error) {
+ super(message, error);
+ this.jobId = jobId;
+ this.username = username;
+ }
+
+ public AsyncExecutionFailed(String jobId,String username, String message) {
+ super(message);
+ this.jobId = jobId;
+ this.username = username;
+ }
+
+
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/ExecutionFailed.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/ExecutionFailed.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/ExecutionFailed.java
new file mode 100644
index 0000000..dcbf79f
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/ExecutionFailed.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message.job;
+
+public class ExecutionFailed extends Failure {
+
+ public ExecutionFailed(String message, Throwable error) {
+ super(message, error);
+ }
+
+ public ExecutionFailed(String message) {
+ super(message, new Exception(message));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Failure.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Failure.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Failure.java
new file mode 100644
index 0000000..af1e69d
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Failure.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message.job;
+
+public class Failure {
+ private final Throwable error;
+ private final String message;
+
+ public Failure(String message, Throwable error) {
+ this.message = message;
+ this.error = error;
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/FetchFailed.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/FetchFailed.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/FetchFailed.java
new file mode 100644
index 0000000..10a2d4e
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/FetchFailed.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message.job;
+
+public class FetchFailed extends Failure{
+
+ public FetchFailed(String message, Throwable error) {
+ super(message, error);
+ }
+
+ public FetchFailed(String message) {
+ this(message, new Exception(message));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Next.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Next.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Next.java
new file mode 100644
index 0000000..bfdc1ea
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Next.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message.job;
+
+public class Next {
+}