You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2016/06/27 23:36:45 UTC

[19/34] ambari git commit: AMBARI-17355 & AMBARI-17354: POC: FE & BE changes for first class support for Yarn hosted services

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
new file mode 100644
index 0000000..ac62cf7
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.ConnectionDelegate;
+import org.apache.ambari.view.hive2.actor.message.AdvanceCursor;
+import org.apache.ambari.view.hive2.actor.message.AsyncJob;
+import org.apache.ambari.view.hive2.actor.message.Connect;
+import org.apache.ambari.view.hive2.actor.message.ExecuteJob;
+import org.apache.ambari.view.hive2.actor.message.ExecuteQuery;
+import org.apache.ambari.view.hive2.actor.message.FetchError;
+import org.apache.ambari.view.hive2.actor.message.FetchResult;
+import org.apache.ambari.view.hive2.actor.message.HiveJob;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.JobRejected;
+import org.apache.ambari.view.hive2.actor.message.RegisterActor;
+import org.apache.ambari.view.hive2.actor.message.ResultReady;
+import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector;
+import org.apache.ambari.view.hive2.internal.ContextSupplier;
+import org.apache.ambari.view.hive2.internal.Either;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.hive2.utils.LoggingOutputStream;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.collections4.map.HashedMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Router actor to control the operations. This delegates the operations to underlying child actors and
+ * store the state for them.
+ */
+public class OperationController extends HiveActor {
+
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  private final ActorSystem system;
+  private final ActorRef deathWatch;
+  private final ContextSupplier<ConnectionDelegate> connectionSupplier;
+  private final ContextSupplier<Storage> storageSupplier;
+  private final ContextSupplier<Optional<HdfsApi>> hdfsApiSupplier;
+
+  /**
+   * Store the connection per user which are currently not working
+   */
+  private final Map<String, Queue<ActorRef>> asyncAvailableConnections;
+
+  /**
+   * Store the connection per user which are currently not working
+   */
+  private final Map<String, Queue<ActorRef>> syncAvailableConnections;
+
+
+  /**
+   * Store the connection per user/per job which are currently working.
+   */
+  private final Map<String, Map<String, ActorRefResultContainer>> asyncBusyConnections;
+
+  /**
+   * Store the connection per user which will be used to execute sync jobs
+   * like fetching databases, tables etc.
+   */
+  private final Map<String, Set<ActorRef>> syncBusyConnections;
+
+  public OperationController(ActorSystem system,
+                             ActorRef deathWatch,
+                             ContextSupplier<ConnectionDelegate> connectionSupplier,
+                             ContextSupplier<Storage> storageSupplier,
+                             ContextSupplier<Optional<HdfsApi>> hdfsApiSupplier) {
+    this.system = system;
+    this.deathWatch = deathWatch;
+    this.connectionSupplier = connectionSupplier;
+    this.storageSupplier = storageSupplier;
+    this.hdfsApiSupplier = hdfsApiSupplier;
+    this.asyncAvailableConnections = new HashMap<>();
+    this.syncAvailableConnections = new HashMap<>();
+    this.asyncBusyConnections = new HashedMap<>();
+    this.syncBusyConnections = new HashMap<>();
+  }
+
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+    Object message = hiveMessage.getMessage();
+
+    if (message instanceof ExecuteJob) {
+      ExecuteJob job = (ExecuteJob) message;
+      if (job.getJob().getType() == HiveJob.Type.ASYNC) {
+        sendJob(job.getConnect(), (AsyncJob) job.getJob());
+      } else if (job.getJob().getType() == HiveJob.Type.SYNC) {
+        sendSyncJob(job.getConnect(), job.getJob());
+      }
+    }
+
+    if (message instanceof ResultReady) {
+      updateResultContainer((ResultReady) message);
+    }
+
+    if(message instanceof AsyncExecutionFailed){
+      updateResultContainerWithError((AsyncExecutionFailed) message);
+    }
+
+    if (message instanceof GetResultHolder) {
+      getResultHolder((GetResultHolder) message);
+    }
+
+    if (message instanceof FetchResult) {
+      fetchResultActorRef((FetchResult) message);
+    }
+
+    if (message instanceof FetchError) {
+      fetchError((FetchError) message);
+    }
+
+    if (message instanceof FreeConnector) {
+      freeConnector((FreeConnector) message);
+    }
+
+    if (message instanceof DestroyConnector) {
+      destroyConnector((DestroyConnector) message);
+    }
+  }
+
+  private void fetchError(FetchError message) {
+    String jobId = message.getJobId();
+    String username = message.getUsername();
+    ActorRefResultContainer container = asyncBusyConnections.get(username).get(jobId);
+    if(container.hasError){
+      sender().tell(Optional.of(container.error), self());
+      return;
+    }
+    sender().tell(Optional.absent(), self());
+  }
+
+  private void updateResultContainerWithError(AsyncExecutionFailed message) {
+    String userName = message.getUsername();
+    String jobId = message.getJobId();
+    ActorRefResultContainer container = asyncBusyConnections.get(userName).get(jobId);
+    container.hasError = true;
+    container.error = message;
+  }
+
+  private void getResultHolder(GetResultHolder message) {
+    String userName = message.getUserName();
+    String jobId = message.getJobId();
+    if(asyncBusyConnections.containsKey(userName) && asyncBusyConnections.get(userName).containsKey(jobId))
+      sender().tell(asyncBusyConnections.get(userName).get(jobId).result, self());
+    else {
+      Either<ActorRef, AsyncExecutionFailed> right = Either.right(new AsyncExecutionFailed(message.getJobId(),userName, "Could not find the job, maybe the pool expired"));
+      sender().tell(right, self());
+    }
+  }
+
+  private void updateResultContainer(ResultReady message) {
+    // set up result actor in container
+    String jobId = message.getJobId();
+    String username = message.getUsername();
+    Either<ActorRef, ActorRef> result = message.getResult();
+    asyncBusyConnections.get(username).get(jobId).result = result;
+    // start processing
+    if(message.getResult().isRight()){
+      // Query with no result sets to be returned
+      // execute right away
+      result.getRight().tell(new ExecuteQuery(),self());
+    }
+    if(result.isLeft()){
+      // There is a result set to be processed
+      result.getLeft().tell(new AdvanceCursor(message.getJobId()),self());
+    }
+
+  }
+
+  private void fetchResultActorRef(FetchResult message) {
+    //Gets an Either actorRef,result implementation
+    // and send back to the caller
+    String username = message.getUsername();
+    String jobId = message.getJobId();
+    ActorRefResultContainer container = asyncBusyConnections.get(username).get(jobId);
+    if(container.hasError){
+      sender().tell(container.error,self());
+      return;
+    }
+    Either<ActorRef, ActorRef> result = container.result;
+    sender().tell(result,self());
+  }
+
+  private void sendJob(Connect connect, AsyncJob job) {
+    String username = job.getUsername();
+    String jobId = job.getJobId();
+    ActorRef subActor = null;
+    // Check if there is available actors to process this
+    subActor = getActorRefFromAsyncPool(username);
+    ViewContext viewContext = job.getViewContext();
+    if (subActor == null) {
+      Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(viewContext);
+      if (!hdfsApiOptional.isPresent()) {
+        sender().tell(new JobRejected(username, jobId, "Failed to connect to Hive."), self());
+        return;
+      }
+      HdfsApi hdfsApi = hdfsApiOptional.get();
+
+      subActor = system.actorOf(
+        Props.create(AsyncJdbcConnector.class, viewContext, hdfsApi, system, self(),
+          deathWatch, connectionSupplier.get(viewContext),
+          storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
+         "jobId:" + jobId + ":-asyncjdbcConnector");
+      deathWatch.tell(new RegisterActor(subActor),self());
+
+    }
+
+    if (asyncBusyConnections.containsKey(username)) {
+      Map<String, ActorRefResultContainer> actors = asyncBusyConnections.get(username);
+      if (!actors.containsKey(jobId)) {
+        actors.put(jobId, new ActorRefResultContainer(subActor));
+      } else {
+        // Reject this as with the same jobId one connection is already in progress.
+        sender().tell(new JobRejected(username, jobId, "Existing job in progress with same jobId."), ActorRef.noSender());
+      }
+    } else {
+      Map<String, ActorRefResultContainer> actors = new HashMap<>();
+      actors.put(jobId, new ActorRefResultContainer(subActor));
+      asyncBusyConnections.put(username, actors);
+    }
+
+    // set up the connect with ExecuteJob id for terminations
+    subActor.tell(connect, self());
+    subActor.tell(job, self());
+
+  }
+
+  private ActorRef getActorRefFromSyncPool(String username) {
+    return getActorRefFromPool(syncAvailableConnections, username);
+  }
+
+  private ActorRef getActorRefFromAsyncPool(String username) {
+    return getActorRefFromPool(asyncAvailableConnections, username);
+  }
+
+  private ActorRef getActorRefFromPool(Map<String, Queue<ActorRef>> pool, String username) {
+    ActorRef subActor = null;
+    if (pool.containsKey(username)) {
+      Queue<ActorRef> availableActors = pool.get(username);
+      if (availableActors.size() != 0) {
+        subActor = availableActors.poll();
+      }
+    } else {
+      pool.put(username, new LinkedList<ActorRef>());
+    }
+    return subActor;
+  }
+
+  private void sendSyncJob(Connect connect, HiveJob job) {
+    String username = job.getUsername();
+    ActorRef subActor = null;
+    // Check if there is available actors to process this
+    subActor = getActorRefFromSyncPool(username);
+    ViewContext viewContext = job.getViewContext();
+
+    if (subActor == null) {
+      Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(viewContext);
+      if(!hdfsApiOptional.isPresent()){
+          sender().tell(new JobRejected(username, ExecuteJob.SYNC_JOB_MARKER, "Failed to connect to HDFS."), ActorRef.noSender());
+          return;
+        }
+      HdfsApi hdfsApi = hdfsApiOptional.get();
+
+      subActor = system.actorOf(
+        Props.create(SyncJdbcConnector.class, viewContext, hdfsApi, system, self(),
+          deathWatch, connectionSupplier.get(viewContext),
+          storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
+          UUID.randomUUID().toString() + ":SyncjdbcConnector" );
+      deathWatch.tell(new RegisterActor(subActor),self());
+
+    }
+
+    if (syncBusyConnections.containsKey(username)) {
+      Set<ActorRef> actors = syncBusyConnections.get(username);
+      actors.add(subActor);
+    } else {
+      LinkedHashSet<ActorRef> actors = new LinkedHashSet<>();
+      actors.add(subActor);
+      syncBusyConnections.put(username, actors);
+    }
+
+    // Termination requires that the ref is known in case of sync jobs
+    subActor.tell(connect, self());
+    subActor.tell(job, sender());
+  }
+
+
+  private void destroyConnector(DestroyConnector message) {
+    ActorRef sender = getSender();
+    if (message.isForAsync()) {
+      removeFromAsyncBusyPool(message.getUsername(), message.getJobId());
+      removeFromASyncAvailable(message.getUsername(), sender);
+    } else {
+      removeFromSyncBusyPool(message.getUsername(), sender);
+      removeFromSyncAvailable(message.getUsername(), sender);
+    }
+    logMaps();
+  }
+
+  private void freeConnector(FreeConnector message) {
+    LOG.info("About to free connector for job {} and user {}",message.getJobId(),message.getUsername());
+    ActorRef sender = getSender();
+    if (message.isForAsync()) {
+      Optional<ActorRef> refOptional = removeFromAsyncBusyPool(message.getUsername(), message.getJobId());
+      if (refOptional.isPresent()) {
+        addToAsyncAvailable(message.getUsername(), refOptional.get());
+      }
+      return;
+    }
+    // Was a sync job, remove from sync pool
+    Optional<ActorRef> refOptional = removeFromSyncBusyPool(message.getUsername(), sender);
+    if (refOptional.isPresent()) {
+      addToSyncAvailable(message.getUsername(), refOptional.get());
+    }
+
+
+    logMaps();
+
+  }
+
+  private void logMaps() {
+    LOG.info("Pool status");
+    LoggingOutputStream out = new LoggingOutputStream(LOG, LoggingOutputStream.LogLevel.INFO);
+    MapUtils.debugPrint(new PrintStream(out), "Busy Async connections", asyncBusyConnections);
+    MapUtils.debugPrint(new PrintStream(out), "Available Async connections", asyncAvailableConnections);
+    MapUtils.debugPrint(new PrintStream(out), "Busy Sync connections", syncBusyConnections);
+    MapUtils.debugPrint(new PrintStream(out), "Available Sync connections", syncAvailableConnections);
+    try {
+      out.close();
+    } catch (IOException e) {
+      LOG.warn("Cannot close Logging output stream, this may lead to leaks");
+    }
+  }
+
+  private Optional<ActorRef> removeFromSyncBusyPool(String userName, ActorRef refToFree) {
+    if (syncBusyConnections.containsKey(userName)) {
+      Set<ActorRef> actorRefs = syncBusyConnections.get(userName);
+      actorRefs.remove(refToFree);
+    }
+    return Optional.of(refToFree);
+  }
+
+  private Optional<ActorRef> removeFromAsyncBusyPool(String username, String jobId) {
+    ActorRef ref = null;
+    if (asyncBusyConnections.containsKey(username)) {
+      Map<String, ActorRefResultContainer> actors = asyncBusyConnections.get(username);
+      if (actors.containsKey(jobId)) {
+        ref = actors.get(jobId).actorRef;
+        actors.remove(jobId);
+      }
+    }
+    return Optional.fromNullable(ref);
+  }
+
+  private void addToAsyncAvailable(String username, ActorRef actor) {
+    addToAvailable(asyncAvailableConnections, username, actor);
+  }
+
+  private void addToSyncAvailable(String username, ActorRef actor) {
+    addToAvailable(syncAvailableConnections, username, actor);
+  }
+
+  private void addToAvailable(Map<String, Queue<ActorRef>> pool, String username, ActorRef actor) {
+    if (!pool.containsKey(username)) {
+      pool.put(username, new LinkedList<ActorRef>());
+    }
+
+    Queue<ActorRef> availableActors = pool.get(username);
+    availableActors.add(actor);
+  }
+
+  private void removeFromASyncAvailable(String username, ActorRef sender) {
+    removeFromAvailable(asyncAvailableConnections, username, sender);
+  }
+
+  private void removeFromSyncAvailable(String username, ActorRef sender) {
+    removeFromAvailable(syncAvailableConnections, username, sender);
+  }
+
+  private void removeFromAvailable(Map<String, Queue<ActorRef>> pool, String username, ActorRef sender) {
+    if (!pool.containsKey(username)) {
+      return;
+    }
+    Queue<ActorRef> actors = pool.get(username);
+    actors.remove(sender);
+  }
+
+  private static class ActorRefResultContainer {
+
+    ActorRef actorRef;
+    boolean hasError = false;
+    Either<ActorRef, ActorRef> result = Either.none();
+    AsyncExecutionFailed error;
+
+    public ActorRefResultContainer(ActorRef actorRef) {
+      this.actorRef = actorRef;
+    }
+  }
+
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java
new file mode 100644
index 0000000..e883768
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import com.google.common.collect.Lists;
+import org.apache.ambari.view.hive2.actor.message.CursorReset;
+import org.apache.ambari.view.hive2.actor.message.JobExecutionCompleted;
+import org.apache.ambari.view.hive2.actor.message.ResetCursor;
+import org.apache.ambari.view.hive2.client.ColumnDescription;
+import org.apache.ambari.view.hive2.client.ColumnDescriptionShort;
+import org.apache.ambari.view.hive2.client.Row;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
+import org.apache.ambari.view.hive2.actor.message.AdvanceCursor;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.job.FetchFailed;
+import org.apache.ambari.view.hive2.actor.message.job.Next;
+import org.apache.ambari.view.hive2.actor.message.job.NoMoreItems;
+import org.apache.ambari.view.hive2.actor.message.job.Result;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.KeepAlive;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
+public class ResultSetIterator extends HiveActor {
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  private static final int DEFAULT_BATCH_SIZE = 100;
+  public static final String NULL = "NULL";
+
+  private final ActorRef parent;
+  private final ResultSet resultSet;
+  private final int batchSize;
+
+  private List<ColumnDescription> columnDescriptions;
+  private int columnCount;
+  private Storage storage;
+  boolean async = false;
+  private boolean jobCompleteMessageSent = false;
+
+
+  private boolean metaDataFetched = false;
+
+  public ResultSetIterator(ActorRef parent, ResultSet resultSet, int batchSize) {
+    this.parent = parent;
+    this.resultSet = resultSet;
+    this.batchSize = batchSize;
+  }
+
+
+  public ResultSetIterator(ActorRef parent, ResultSet resultSet, Storage storage) {
+    this(parent, resultSet);
+    this.storage = storage;
+    this.async = true;
+  }
+
+  public ResultSetIterator(ActorRef parent, ResultSet resultSet) {
+    this(parent, resultSet, DEFAULT_BATCH_SIZE);
+  }
+
+  @Override
+  void handleMessage(HiveMessage hiveMessage) {
+    LOG.info("Result set Iterator wil handle message {}", hiveMessage);
+    sendKeepAlive();
+    Object message = hiveMessage.getMessage();
+    if (message instanceof Next) {
+      getNext();
+    }
+    if (message instanceof ResetCursor) {
+      resetResultSet();
+    }
+
+    if (message instanceof KeepAlive) {
+      sendKeepAlive();
+    }
+    if (message instanceof AdvanceCursor) {
+      AdvanceCursor moveCursor = (AdvanceCursor) message;
+      advanceCursor(moveCursor);
+    }
+
+  }
+
+  private void advanceCursor(AdvanceCursor moveCursor) {
+    String jobid = moveCursor.getJob();
+    try {
+      // Block here so that we can update the job status
+      resultSet.next();
+      // Resetting the resultset as it needs to fetch from the beginning when the result is asked for.
+      resultSet.beforeFirst();
+      LOG.info("Job execution successful. Setting status in db.");
+      updateJobStatus(jobid, Job.JOB_STATE_FINISHED);
+      sendJobCompleteMessageIfNotDone();
+    } catch (SQLException e) {
+      LOG.error("Failed to reset the cursor after advancing. Setting error state in db.", e);
+      updateJobStatus(jobid, Job.JOB_STATE_ERROR);
+      sender().tell(new FetchFailed("Failed to reset the cursor after advancing", e), self());
+      cleanUpResources();
+    }
+  }
+
+  private void updateJobStatus(String jobid, String status) {
+    try {
+      JobImpl job = storage.load(JobImpl.class, jobid);
+      job.setStatus(status);
+      storage.store(JobImpl.class, job);
+    } catch (ItemNotFound itemNotFound) {
+      // Cannot do anything
+    }
+  }
+
+  private void resetResultSet() {
+    try {
+      resultSet.beforeFirst();
+      sender().tell(new CursorReset(), self());
+    } catch (SQLException e) {
+      LOG.error("Failed to reset the cursor", e);
+      sender().tell(new FetchFailed("Failed to reset the cursor", e), self());
+      cleanUpResources();
+    }
+  }
+
+  private void sendKeepAlive() {
+    LOG.debug("Sending a keep alive to {}", parent);
+    parent.tell(new KeepAlive(), self());
+  }
+
+  private void getNext() {
+    List<Row> rows = Lists.newArrayList();
+    if (!metaDataFetched) {
+      try {
+        initialize();
+      } catch (SQLException ex) {
+        LOG.error("Failed to fetch metadata for the ResultSet", ex);
+        sender().tell(new FetchFailed("Failed to get metadata for ResultSet", ex), self());
+        cleanUpResources();
+      }
+    }
+    int index = 0;
+    try {
+      while (resultSet.next() && index < batchSize) {
+        index++;
+        rows.add(getRowFromResultSet(resultSet));
+        sendJobCompleteMessageIfNotDone();
+      }
+
+      if (index == 0) {
+        // We have hit end of resultSet
+        sender().tell(new NoMoreItems(), self());
+        if(!async) {
+          cleanUpResources();
+        }
+      } else {
+        Result result = new Result(rows, columnDescriptions);
+        sender().tell(result, self());
+      }
+
+    } catch (SQLException ex) {
+      LOG.error("Failed to fetch next batch for the Resultset", ex);
+      sender().tell(new FetchFailed("Failed to fetch next batch for the Resultset", ex), self());
+      cleanUpResources();
+    }
+  }
+
+  private void sendJobCompleteMessageIfNotDone() {
+    if (!jobCompleteMessageSent) {
+      jobCompleteMessageSent = true;
+      parent.tell(new JobExecutionCompleted(), self());
+    }
+  }
+
+  private void cleanUpResources() {
+    parent.tell(new CleanUp(), self());
+  }
+
+  private Row getRowFromResultSet(ResultSet resultSet) throws SQLException {
+    Object[] values = new Object[columnCount];
+    for (int i = 0; i < columnCount; i++) {
+      values[i] = resultSet.getObject(i + 1);
+    }
+    return new Row(values);
+  }
+
+  private void initialize() throws SQLException {
+    metaDataFetched = true;
+    ResultSetMetaData metaData = resultSet.getMetaData();
+    columnCount = metaData.getColumnCount();
+    columnDescriptions = Lists.newArrayList();
+    for (int i = 1; i <= columnCount; i++) {
+      String columnName = metaData.getColumnName(i);
+      String typeName = metaData.getColumnTypeName(i);
+      ColumnDescription description = new ColumnDescriptionShort(columnName, typeName, i);
+      columnDescriptions.add(description);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java
new file mode 100644
index 0000000..a0b6eae
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.actor.message.RegisterActor;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.hive2.ConnectionDelegate;
+import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.SyncJob;
+import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed;
+import org.apache.ambari.view.hive2.actor.message.job.NoResult;
+import org.apache.ambari.view.hive2.actor.message.job.ResultSetHolder;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.hive.jdbc.HiveConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class SyncJdbcConnector extends JdbcConnector {
+
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+  private ActorRef resultSetActor = null;
+
+  public SyncJdbcConnector(ViewContext viewContext, HdfsApi hdfsApi, ActorSystem system, ActorRef parent,ActorRef deathWatch, ConnectionDelegate connectionDelegate, Storage storage) {
+    super(viewContext, hdfsApi, system, parent,deathWatch, connectionDelegate, storage);
+  }
+
+  @Override
+  protected void handleJobMessage(HiveMessage message) {
+    Object job = message.getMessage();
+    if(job instanceof SyncJob) {
+      execute((SyncJob) job);
+    } else if (job instanceof GetColumnMetadataJob) {
+      getColumnMetaData((GetColumnMetadataJob) job);
+    }
+  }
+
+  @Override
+  protected boolean isAsync() {
+    return false;
+  }
+
+  @Override
+  protected void cleanUpChildren() {
+    if(resultSetActor != null && !resultSetActor.isTerminated()) {
+      LOG.debug("Sending poison pill to log aggregator");
+      resultSetActor.tell(PoisonPill.getInstance(), self());
+    }
+  }
+
+  @Override
+  protected void notifyFailure() {
+    sender().tell(new ExecutionFailed("Cannot connect to hive"), ActorRef.noSender());
+  }
+
+  protected void execute(final SyncJob job) {
+    this.executing = true;
+    executeJob(new Operation<SyncJob>() {
+      @Override
+      SyncJob getJob() {
+        return job;
+      }
+
+      @Override
+      Optional<ResultSet> call(HiveConnection connection) throws SQLException {
+        return connectionDelegate.executeSync(connection, job);
+      }
+
+      @Override
+      String notConnectedErrorMessage() {
+        return "Cannot execute sync job for user: " + job.getUsername() + ". Not connected to Hive";
+      }
+
+      @Override
+      String executionFailedErrorMessage() {
+        return "Failed to execute Jdbc Statement";
+      }
+    });
+  }
+
+
+  private void getColumnMetaData(final GetColumnMetadataJob job) {
+    executeJob(new Operation<GetColumnMetadataJob>() {
+
+      @Override
+      GetColumnMetadataJob getJob() {
+        return job;
+      }
+
+      @Override
+      Optional<ResultSet> call(HiveConnection connection) throws SQLException {
+        return connectionDelegate.getColumnMetadata(connection, job);
+      }
+
+      @Override
+      String notConnectedErrorMessage() {
+        return String.format("Cannot get column metadata for user: %s, schema: %s, table: %s, column: %s" +
+            ". Not connected to Hive", job.getUsername(), job.getSchemaPattern(), job.getTablePattern(),
+          job.getColumnPattern());
+      }
+
+      @Override
+      String executionFailedErrorMessage() {
+        return "Failed to execute Jdbc Statement";
+      }
+    });
+  }
+
+  private void executeJob(Operation operation) {
+    ActorRef sender = this.getSender();
+    String errorMessage = operation.notConnectedErrorMessage();
+    if (connectable == null) {
+      sender.tell(new ExecutionFailed(errorMessage), ActorRef.noSender());
+      cleanUp();
+      return;
+    }
+
+    Optional<HiveConnection> connectionOptional = connectable.getConnection();
+    if (!connectionOptional.isPresent()) {
+      sender.tell(new ExecutionFailed(errorMessage), ActorRef.noSender());
+      cleanUp();
+      return;
+    }
+
+    try {
+      Optional<ResultSet> resultSetOptional = operation.call(connectionOptional.get());
+      if(resultSetOptional.isPresent()) {
+        ActorRef resultSetActor = getContext().actorOf(Props.create(ResultSetIterator.class, self(),
+          resultSetOptional.get()).withDispatcher("akka.actor.result-dispatcher"));
+        deathWatch.tell(new RegisterActor(resultSetActor),self());
+        sender.tell(new ResultSetHolder(resultSetActor), self());
+      } else {
+        sender.tell(new NoResult(), self());
+        cleanUp();
+      }
+    } catch (SQLException e) {
+      LOG.error(operation.executionFailedErrorMessage(), e);
+      sender.tell(new ExecutionFailed(operation.executionFailedErrorMessage(), e), self());
+      cleanUp();
+    }
+  }
+
+  private abstract class Operation<T> {
+    abstract T getJob();
+    abstract Optional<ResultSet> call(HiveConnection connection) throws SQLException;
+    abstract String notConnectedErrorMessage();
+    abstract String executionFailedErrorMessage();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java
new file mode 100644
index 0000000..0f918ad
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.UntypedActor;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+
+/**
+ * Queries YARN/ATS time to time to fetch the status of the ExecuteJob and updates database
+ */
+public class YarnAtsParser extends HiveActor {
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java
new file mode 100644
index 0000000..c3e6c04
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class AdvanceCursor {
+
+    private String job;
+
+    public AdvanceCursor(String job) {
+        this.job = job;
+    }
+
+    public String getJob() {
+        return job;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java
new file mode 100644
index 0000000..fd1f26f
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import com.google.common.base.Optional;
+
+import java.sql.ResultSet;
+
+public class AssignResultSet {
+
+    private Optional<ResultSet> resultSet;
+
+
+    public AssignResultSet(Optional<ResultSet> resultSet) {
+        this.resultSet = resultSet;
+
+    }
+
+
+    public ResultSet getResultSet() {
+        return resultSet.orNull();
+    }
+
+
+    @Override
+    public String toString() {
+        return "ExtractResultSet{" +
+                "resultSet=" + resultSet +
+                '}';
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignStatement.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignStatement.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignStatement.java
new file mode 100644
index 0000000..85273ab
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignStatement.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import com.google.common.base.Optional;
+
+import java.sql.Statement;
+
+public class AssignStatement {
+
+    private Statement resultSet;
+
+    public AssignStatement(Statement statement) {
+        this.resultSet = statement;
+    }
+
+
+    public Statement getStatement() {
+        return resultSet;
+    }
+
+    @Override
+    public String toString() {
+        return "AssignStatement{" +
+                "resultSet=" + resultSet +
+                '}';
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AsyncJob.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AsyncJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AsyncJob.java
new file mode 100644
index 0000000..6dfd709
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AsyncJob.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import org.apache.ambari.view.ViewContext;
+
+/**
+ * Message to be sent when a statement has to be executed
+ */
+public class AsyncJob extends DDLJob {
+  private final String jobId;
+  private final String logFile;
+
+  public AsyncJob(String jobId, String username, String[] statements, String logFile,ViewContext viewContext) {
+    super(Type.ASYNC, statements, username,viewContext);
+    this.jobId = jobId;
+    this.logFile = logFile;
+  }
+
+  public String getJobId() {
+    return jobId;
+  }
+
+  public String getLogFile() {
+    return logFile;
+  }
+
+
+  @Override
+  public String toString() {
+    return "AsyncJob{" +
+            "jobId='" + jobId + '\'' +
+            ", logFile='" + logFile + '\'' +
+            "} " + super.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/Connect.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/Connect.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/Connect.java
new file mode 100644
index 0000000..b859ac1
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/Connect.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import org.apache.ambari.view.hive2.internal.Connectable;
+import org.apache.ambari.view.hive2.internal.HiveConnectionWrapper;
+
+/**
+ * Connect message to be sent to the Connection Actor with the connection parameters
+ */
+public class Connect {
+
+  private final String username;
+  private final String password;
+  private final String jdbcUrl;
+
+
+  public Connect(String username, String password, String jdbcUrl) {
+    this.username = username;
+    this.password = password;
+    this.jdbcUrl = jdbcUrl;
+  }
+
+  public Connectable getConnectable(){
+    return new HiveConnectionWrapper(getJdbcUrl(),username,password);
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public String getJdbcUrl() {
+    return jdbcUrl;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/CursorReset.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/CursorReset.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/CursorReset.java
new file mode 100644
index 0000000..d805754
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/CursorReset.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class CursorReset {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/DDLJob.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/DDLJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/DDLJob.java
new file mode 100644
index 0000000..7e19a77
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/DDLJob.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.ambari.view.ViewContext;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+
+public class DDLJob extends HiveJob {
+
+  public static final String SEMICOLON = ";";
+  private String[] statements;
+
+  public DDLJob(Type type, String[] statements, String username, ViewContext viewContext) {
+    super(type, username, viewContext);
+    this.statements = new String[statements.length];
+    for (int i = 0; i < statements.length; i++) {
+      this.statements[i] = clean(statements[i]);
+
+    }
+
+  }
+
+  private String clean(String statement) {
+    return StringUtils.trim(statement);
+  }
+
+  public Collection<String> getStatements() {
+    return Arrays.asList(statements);
+  }
+
+  /**
+   * Get the statements to be executed synchronously
+   *
+   * @return
+   */
+  public Collection<String> getSyncStatements() {
+    if (!(statements.length > 1))
+      return Collections.emptyList();
+    else
+      return ImmutableList.copyOf(Arrays.copyOfRange(statements, 0, statements.length - 1));
+  }
+
+  /**
+   * Get the statement to be executed asynchronously
+   *
+   * @return async statement
+   */
+  public String getAsyncStatement() {
+    return statements[statements.length - 1];
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteJob.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteJob.java
new file mode 100644
index 0000000..d4d8a1b
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteJob.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class ExecuteJob {
+  public final static String SYNC_JOB_MARKER = "SYNC";
+  private final Connect connect;
+  private final HiveJob job;
+
+  public ExecuteJob(Connect connect, HiveJob job) {
+    this.connect = connect;
+    this.job = job;
+  }
+
+  public Connect getConnect() {
+    return connect;
+  }
+
+  public HiveJob getJob() {
+    return job;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteQuery.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteQuery.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteQuery.java
new file mode 100644
index 0000000..b3d1599
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ExecuteQuery.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class ExecuteQuery {
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchError.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchError.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchError.java
new file mode 100644
index 0000000..c78dc43
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchError.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+/**
+ *
+ * Fetch the result for
+ *
+ */
+public class FetchError {
+    private final String jobId;
+    private final String username;
+
+    public FetchError(String jobId, String username) {
+        this.jobId = jobId;
+        this.username = username;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchResult.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchResult.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchResult.java
new file mode 100644
index 0000000..6b2ac42
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/FetchResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+/**
+ *
+ * Fetch the result for
+ *
+ */
+public class FetchResult {
+    private final String jobId;
+    private final String username;
+
+    public FetchResult(String jobId, String username) {
+        this.jobId = jobId;
+        this.username = username;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java
new file mode 100644
index 0000000..defa08c
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import org.apache.ambari.view.ViewContext;
+
+public class GetColumnMetadataJob extends HiveJob {
+  private final String schemaPattern;
+  private final String tablePattern;
+  private final String columnPattern;
+  public GetColumnMetadataJob(String username, ViewContext viewContext,
+                              String schemaPattern, String tablePattern, String columnPattern) {
+    super(Type.SYNC, username, viewContext);
+    this.schemaPattern = schemaPattern;
+    this.tablePattern = tablePattern;
+    this.columnPattern = columnPattern;
+  }
+
+  public GetColumnMetadataJob(String username, ViewContext viewContext,
+                              String tablePattern, String columnPattern) {
+    this(username, viewContext, "*", tablePattern, columnPattern);
+  }
+
+  public GetColumnMetadataJob(String username, ViewContext viewContext,
+                              String columnPattern) {
+    this(username, viewContext, "*", "*", columnPattern);
+  }
+
+  public GetColumnMetadataJob(String username, ViewContext viewContext) {
+    this(username, viewContext, "*", "*", "*");
+  }
+
+  public String getSchemaPattern() {
+    return schemaPattern;
+  }
+
+  public String getTablePattern() {
+    return tablePattern;
+  }
+
+  public String getColumnPattern() {
+    return columnPattern;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetMoreLogs.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetMoreLogs.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetMoreLogs.java
new file mode 100644
index 0000000..6e084ee
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetMoreLogs.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+
+public class GetMoreLogs {}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java
new file mode 100644
index 0000000..ee3c1be
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.ambari.view.ViewContext;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+public abstract class HiveJob {
+
+  private final String username;
+  private final Type type;
+  private final ViewContext viewContext;
+
+  public HiveJob(Type type, String username,ViewContext viewContext) {
+    this.type = type;
+    this.username = username;
+    this.viewContext = viewContext;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+
+
+
+  public Type getType() {
+    return type;
+  }
+
+
+
+  public ViewContext getViewContext() {
+    return viewContext;
+  }
+
+
+  public enum Type {
+    SYNC,
+    ASYNC
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveMessage.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveMessage.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveMessage.java
new file mode 100644
index 0000000..28533c7
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+
+import java.util.UUID;
+
+/**
+ * Message wrapper, Each message has a unique ID
+ */
+public class HiveMessage {
+
+    private String id = UUID.randomUUID().toString();
+
+    private Object message;
+
+    public HiveMessage(Object message) {
+        this.message = message;
+    }
+
+
+    public Object getMessage() {
+        return message;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public String toString() {
+        return "HiveMessage{" +
+                "message=" + message +
+                ", id='" + id + '\'' +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobExecutionCompleted.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobExecutionCompleted.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobExecutionCompleted.java
new file mode 100644
index 0000000..52ba3f5
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobExecutionCompleted.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class JobExecutionCompleted {}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobRejected.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobRejected.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobRejected.java
new file mode 100644
index 0000000..4f9aab8
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobRejected.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class JobRejected {
+
+  private final String username;
+  private final String jobId;
+  private final String message;
+
+  public JobRejected(String username, String jobId, String message) {
+    this.username = username;
+    this.jobId = jobId;
+    this.message = message;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public String getJobId() {
+    return jobId;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobSubmitted.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobSubmitted.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobSubmitted.java
new file mode 100644
index 0000000..bc8df2a
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobSubmitted.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class JobSubmitted {
+
+  private final String username;
+  private final String jobId;
+
+  public JobSubmitted(String username, String jobId) {
+    this.username = username;
+    this.jobId = jobId;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public String getJobId() {
+    return jobId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/LogAggregationFinished.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/LogAggregationFinished.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/LogAggregationFinished.java
new file mode 100644
index 0000000..bfe37b5
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/LogAggregationFinished.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class LogAggregationFinished {}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/RegisterActor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/RegisterActor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/RegisterActor.java
new file mode 100644
index 0000000..9988252
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/RegisterActor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import akka.actor.ActorRef;
+
+public class RegisterActor {
+
+    private ActorRef actorRef;
+
+    public RegisterActor(ActorRef actorRef) {
+        this.actorRef = actorRef;
+    }
+
+    public ActorRef getActorRef() {
+        return actorRef;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResetCursor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResetCursor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResetCursor.java
new file mode 100644
index 0000000..53276d3
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResetCursor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class ResetCursor {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultReady.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultReady.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultReady.java
new file mode 100644
index 0000000..65de920
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultReady.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import akka.actor.ActorRef;
+import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
+import org.apache.ambari.view.hive2.internal.Either;
+
+/**
+ *
+ * Fetch the result for
+ *
+ */
+public class ResultReady extends FetchResult {
+    private Either<ActorRef, ActorRef> result;
+
+
+    public ResultReady(String jobId, String username, Either<ActorRef, ActorRef> result) {
+        super(jobId, username);
+        this.result = result;
+    }
+
+    public Either<ActorRef, ActorRef> getResult() {
+        return result;
+    }
+
+    public void setResult(Either<ActorRef, ActorRef> result) {
+        this.result = result;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
new file mode 100644
index 0000000..3bae12a
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+public class StartLogAggregation {}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SyncJob.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SyncJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SyncJob.java
new file mode 100644
index 0000000..7aece31
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SyncJob.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message;
+
+import org.apache.ambari.view.ViewContext;
+
+public class SyncJob extends DDLJob {
+  public SyncJob(String username, String[] statements,ViewContext viewContext) {
+    super(Type.SYNC, statements, username,viewContext);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/AsyncExecutionFailed.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/AsyncExecutionFailed.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/AsyncExecutionFailed.java
new file mode 100644
index 0000000..514d9cd
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/AsyncExecutionFailed.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message.job;
+
+public class AsyncExecutionFailed extends ExecutionFailed {
+  private final String jobId;
+  private final String username;
+
+  public AsyncExecutionFailed(String jobId,String username, String message, Throwable error) {
+    super(message, error);
+    this.jobId = jobId;
+    this.username = username;
+  }
+
+  public AsyncExecutionFailed(String jobId,String username, String message) {
+    super(message);
+    this.jobId = jobId;
+    this.username = username;
+  }
+
+
+
+  public String getJobId() {
+    return jobId;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/ExecutionFailed.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/ExecutionFailed.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/ExecutionFailed.java
new file mode 100644
index 0000000..dcbf79f
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/ExecutionFailed.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message.job;
+
+public class ExecutionFailed extends Failure {
+
+  public ExecutionFailed(String message, Throwable error) {
+    super(message, error);
+  }
+
+  public ExecutionFailed(String message) {
+    super(message, new Exception(message));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Failure.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Failure.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Failure.java
new file mode 100644
index 0000000..af1e69d
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Failure.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message.job;
+
+public class Failure {
+  private final Throwable error;
+  private final String message;
+
+  public Failure(String message, Throwable error) {
+    this.message = message;
+    this.error = error;
+  }
+
+  public Throwable getError() {
+    return error;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/FetchFailed.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/FetchFailed.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/FetchFailed.java
new file mode 100644
index 0000000..10a2d4e
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/FetchFailed.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message.job;
+
+public class FetchFailed extends Failure{
+
+  public FetchFailed(String message, Throwable error) {
+    super(message, error);
+  }
+
+  public FetchFailed(String message) {
+    this(message, new Exception(message));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Next.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Next.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Next.java
new file mode 100644
index 0000000..bfdc1ea
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/Next.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message.job;
+
+public class Next {
+}