You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2016/06/27 23:36:46 UTC
[20/34] ambari git commit: AMBARI-17355 & AMBARI-17354: POC: FE & BE
changes for first class support for Yarn hosted services
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/pom.xml b/contrib/views/hive-next/pom.xml
new file mode 100644
index 0000000..c571352
--- /dev/null
+++ b/contrib/views/hive-next/pom.xml
@@ -0,0 +1,369 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.ambari.contrib.views</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ <name>Hive Jdbc</name>
+
+ <parent>
+ <groupId>org.apache.ambari.contrib.views</groupId>
+ <artifactId>ambari-contrib-views</artifactId>
+ <version>2.4.0.0.0</version>
+ </parent>
+
+ <repositories>
+ <repository>
+ <id>repository.apache.org</id>
+ <name>Apache Snapshot repo</name>
+ <url>https://repository.apache.org/content/groups/snapshots</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ <version>2.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-multipart</artifactId>
+ <version>1.18</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ <version>1.8</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ <version>1.18.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ <version>1.9</version>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>1.6</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-csv</artifactId>
+ <version>1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ <version>4.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-views</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.2.2</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ <version>3.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>1.7.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>javax.ws.rs-api</artifactId>
+ <version>2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <version>${hive-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.9.0</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-testkit_2.11</artifactId>
+ <version>2.3.15</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ambari.contrib.views</groupId>
+ <artifactId>ambari-views-utils</artifactId>
+ <version>2.4.0.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-validator</groupId>
+ <artifactId>commons-validator</artifactId>
+ <version>1.4.0</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-csv</artifactId>
+ <version>1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_2.11</artifactId>
+ <version>2.3.15</version>
+ </dependency>
+ </dependencies>
+
+ <properties>
+ <ambari.dir>${project.parent.parent.parent.basedir}</ambari.dir>
+ <hive-version>2.1.0-SNAPSHOT</hive-version>
+ <ambari.version>1.3.0.0-SNAPSHOT</ambari.version>
+ </properties>
+ <build>
+ <plugins>
+
+ <!-- Building frontend -->
+ <plugin>
+ <groupId>com.github.eirslett</groupId>
+ <artifactId>frontend-maven-plugin</artifactId>
+ <version>0.0.16</version>
+ <configuration>
+ <nodeVersion>v0.12.2</nodeVersion>
+ <npmVersion>1.4.8</npmVersion>
+ <workingDirectory>src/main/resources/ui/hive-web/</workingDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <id>install node and npm</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>install-node-and-npm</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>npm install</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>npm</goal>
+ </goals>
+ <configuration>
+ <arguments>install --python="${project.basedir}/../src/main/unix/ambari-python-wrap" --unsafe-perm</arguments>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>exec-maven-plugin</artifactId>
+ <groupId>org.codehaus.mojo</groupId>
+ <version>1.3.2</version>
+ <executions>
+ <execution>
+ <id>Hive build</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <workingDirectory>${basedir}/src/main/resources/ui/hive-web</workingDirectory>
+ <executable>node/node</executable>
+ <arguments>
+ <argument>node_modules/.bin/ember</argument>
+ <argument>build</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.vafer</groupId>
+ <artifactId>jdeb</artifactId>
+ <version>1.0.1</version>
+ <executions>
+ <execution>
+ <phase>none</phase>
+ <goals>
+ <goal>jdeb</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <submodules>false</submodules>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>false</filtering>
+ <includes>
+ <include>META-INF/**/*</include>
+ <include>view.xml</include>
+ <include>view.log4j.properties</include>
+ <include>application.conf</include>
+ </includes>
+ </resource>
+ <resource>
+ <directory>src/main/resources/ui/hive-web/dist</directory>
+ <filtering>false</filtering>
+ </resource>
+ <resource>
+ <directory>src/main/resources/ui/hive-web/bower_components/polestar</directory>
+ <filtering>false</filtering>
+ <targetPath>polestar</targetPath>
+ </resource>
+ <resource>
+ <directory>src/main/resources/ui/hive-web/bower_components/voyager</directory>
+ <filtering>false</filtering>
+ <targetPath>voyager</targetPath>
+ </resource>
+ <resource>
+ <targetPath>WEB-INF/lib</targetPath>
+ <filtering>false</filtering>
+ <directory>target/lib</directory>
+ </resource>
+ </resources>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/BaseService.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/BaseService.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/BaseService.java
new file mode 100644
index 0000000..773aa55
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/BaseService.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.utils.SharedObjectsFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+
+
+/**
+ * Parent service
+ */
+public class BaseService {
+ @Inject
+ protected ViewContext context;
+
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(BaseService.class);
+
+ private SharedObjectsFactory sharedObjectsFactory;
+ public SharedObjectsFactory getSharedObjectsFactory() {
+ if (sharedObjectsFactory == null) {
+ sharedObjectsFactory = new SharedObjectsFactory(context);
+ }
+ return sharedObjectsFactory;
+ }
+
+ public void setSharedObjectsFactory(SharedObjectsFactory sharedObjectsFactory) {
+ this.sharedObjectsFactory = sharedObjectsFactory;
+ }
+
+ public BaseService() {
+// Thread.currentThread().setContextClassLoader(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java
new file mode 100644
index 0000000..918dc68
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import com.google.common.base.Optional;
+import org.apache.ambari.view.hive2.actor.message.DDLJob;
+import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob;
+import org.apache.ambari.view.hive2.actor.message.HiveJob;
+import org.apache.ambari.view.hive2.internal.HiveResult;
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.jdbc.HiveStatement;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public interface ConnectionDelegate {
+ Optional<ResultSet> execute(HiveConnection connection, DDLJob job) throws SQLException;
+ Optional<ResultSet> executeSync(HiveConnection connection, DDLJob job) throws SQLException;
+ Optional<ResultSet> getColumnMetadata(HiveConnection connection, GetColumnMetadataJob job) throws SQLException;
+ Optional<ResultSet> getCurrentResultSet();
+ Optional<HiveStatement> getCurrentStatement();
+ void closeResultSet();
+ void closeStatement();
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionFactory.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionFactory.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionFactory.java
new file mode 100644
index 0000000..2f7ffe0
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionFactory.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.FluentIterable;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.client.ConnectionConfig;
+
+import java.util.List;
+
+public class ConnectionFactory {
+
+ private static final String ZK_HIVE_DYN_SERVICE_DISCOVERY_KEY = "hive.server2.support.dynamic.service.discovery";
+ private static final String ZK_HIVE_NAMESPACE_KEY = "hive.server2.zookeeper.namespace";
+ private static final String ZK_HIVE_QUORUM = "hive.zookeeper.quorum";
+
+ private static final String AMBARI_HIVE_SERVICE_NAME = "HIVE";
+ private static final String AMBARI_HIVESERVER_COMPONENT_NAME = "HIVE_SERVER";
+
+ private static final String HIVE_SITE = "hive-site";
+ private static final String HIVE_INTERACTIVE_SITE = "hive-interactive-site";
+
+ private static final String HIVE_JDBC_URL_KEY = "hive.jdbc.url";
+ private static final String HIVE_SESSION_PARAMS = "hive.session.params";
+
+ private static final String BINARY_PORT_KEY = "hive.server2.thrift.port";
+ private static final String HTTP_PORT_KEY = "hive.server2.thrift.http.port";
+ private static final String HIVE_TRANSPORT_MODE_KEY = "hive.server2.transport.mode";
+ private static final String HTTP_PATH_KEY = "hive.server2.thrift.http.path";
+
+
+ public static ConnectionConfig create(ViewContext context) {
+ String jdbcUrl;
+ if (context.getCluster() == null) {
+ jdbcUrl = getConnectFromCustom(context);
+ } else {
+ if (zookeeperConfigured(context)) {
+ jdbcUrl = getFromClusterZookeeperConfig(context);
+ } else {
+ jdbcUrl = getFromHiveConfiguration(context);
+ }
+ }
+
+ String userName = context.getUsername();
+ return new ConnectionConfig(userName, "", jdbcUrl);
+ }
+
+ private static String getFromHiveConfiguration(ViewContext context) {
+ String transportMode = context.getCluster().getConfigurationValue(HIVE_SITE, HIVE_TRANSPORT_MODE_KEY);
+ String binaryPort = context.getCluster().getConfigurationValue(HIVE_SITE, BINARY_PORT_KEY);
+ String httpPort = context.getCluster().getConfigurationValue(HIVE_SITE, HTTP_PORT_KEY);
+ String pathKey = context.getCluster().getConfigurationValue(HIVE_SITE, HTTP_PATH_KEY);
+ List<String> hiveHosts = context.getCluster().getHostsForServiceComponent(AMBARI_HIVE_SERVICE_NAME, AMBARI_HIVESERVER_COMPONENT_NAME);
+ String sessionParams = context.getProperties().get(HIVE_SESSION_PARAMS);
+
+ boolean isBinary = transportMode.equalsIgnoreCase("binary");
+ final String port = isBinary ? binaryPort : httpPort;
+
+ List<String> hostPorts = FluentIterable.from(hiveHosts).transform(new Function<String, String>() {
+ @Override
+ public String apply(String input) {
+ return input + ":" + port;
+ }
+ }).toList();
+
+ String concatHostPorts = Joiner.on(",").join(hostPorts);
+
+ StringBuilder builder = new StringBuilder();
+ builder.append("jdbc:hive2://")
+ .append(concatHostPorts)
+ .append(";")
+ .append(sessionParams);
+
+ if (!isBinary) {
+ builder.append(";").append("transportMode=http;httpPath=").append(pathKey);
+ }
+
+ return builder.toString();
+ }
+
+ private static String getFromClusterZookeeperConfig(ViewContext context) {
+ String quorum = context.getCluster().getConfigurationValue(HIVE_SITE, ZK_HIVE_QUORUM);
+ if (quorum == null) {
+ quorum = context.getCluster().getConfigurationValue(HIVE_INTERACTIVE_SITE, ZK_HIVE_QUORUM);
+ }
+
+ String namespace = context.getCluster().getConfigurationValue(HIVE_SITE, ZK_HIVE_NAMESPACE_KEY);
+ if (namespace == null) {
+ namespace = context.getCluster().getConfigurationValue(HIVE_INTERACTIVE_SITE, ZK_HIVE_NAMESPACE_KEY);
+ }
+
+ String sessionParams = context.getProperties().get(HIVE_SESSION_PARAMS);
+
+ String formatted = String.format("jdbc:hive2://%s/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=%s", quorum, namespace);
+ if(!Strings.isNullOrEmpty(sessionParams)){
+ return formatted + ";" + sessionParams;
+ }
+ return formatted;
+ }
+
+ private static boolean zookeeperConfigured(ViewContext context) {
+ boolean fromHiveSite = Boolean.valueOf(context.getCluster().getConfigurationValue(HIVE_SITE, ZK_HIVE_DYN_SERVICE_DISCOVERY_KEY));
+ boolean fromHiveInteractiveSite = Boolean.valueOf(context.getCluster().getConfigurationValue(HIVE_INTERACTIVE_SITE, ZK_HIVE_DYN_SERVICE_DISCOVERY_KEY));
+ return fromHiveInteractiveSite || fromHiveSite;
+ }
+
+ private static String getConnectFromCustom(ViewContext context) {
+ String jdbcUrl = context.getProperties().get(HIVE_JDBC_URL_KEY);
+ String hiveSessionParams = context.getProperties().get(HIVE_SESSION_PARAMS);
+ return jdbcUrl + ";" + hiveSessionParams;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java
new file mode 100644
index 0000000..f026ea6
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.actor.DeathWatch;
+import org.apache.ambari.view.hive2.actor.OperationController;
+import org.apache.ambari.view.hive2.internal.ConnectionSupplier;
+import org.apache.ambari.view.hive2.internal.DataStorageSupplier;
+import org.apache.ambari.view.hive2.internal.HdfsApiSupplier;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class ConnectionSystem {
+
+ private static final String ACTOR_CONF_FILE = "application.conf";
+ private static final String ACTOR_SYSTEM_NAME = "HiveViewActorSystem";
+ private ActorSystem actorSystem = null;
+ private static volatile ConnectionSystem instance = null;
+ private static final Object lock = new Object();
+ private static Map<String, ActorRef> operationControllerMap = new HashMap<>();
+
+ private ConnectionSystem() {
+ this.actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME);;
+ }
+
+ public static ConnectionSystem getInstance() {
+ if(instance == null) {
+ synchronized (lock) {
+ if(instance == null) {
+ instance = new ConnectionSystem();
+ }
+ }
+ }
+ return instance;
+ }
+
+ private ActorRef createOperationController() {
+ ActorRef deathWatch = actorSystem.actorOf(Props.create(DeathWatch.class));
+ return actorSystem.actorOf(
+ Props.create(OperationController.class, actorSystem,deathWatch,
+ new ConnectionSupplier(), new DataStorageSupplier(), new HdfsApiSupplier()));
+ }
+
+ public ActorSystem getActorSystem() {
+ return actorSystem;
+ }
+
+ /**
+ * Returns one operationController per View Instance
+ * @param context
+ * @return operationController Instance
+ */
+ public ActorRef getOperationController(ViewContext context) {
+ String instanceName = context.getInstanceName();
+ ActorRef ref = operationControllerMap.get(instanceName);
+ if(ref == null) {
+ synchronized (lock) {
+ ref = operationControllerMap.get(instanceName);
+ if(ref == null) {
+ ref = createOperationController();
+ operationControllerMap.put(instanceName, ref);
+ }
+ }
+ }
+ return ref;
+ }
+
+ public void shutdown() {
+ if(!actorSystem.isTerminated()) {
+ actorSystem.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HelpService.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HelpService.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HelpService.java
new file mode 100644
index 0000000..b0316ab
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HelpService.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.ViewResourceHandler;
+import org.apache.ambari.view.hive2.resources.files.FileService;
+import org.apache.ambari.view.hive2.resources.jobs.atsJobs.ATSParserFactory;
+import org.apache.ambari.view.hive2.resources.jobs.atsJobs.ATSRequestsDelegateImpl;
+import org.json.simple.JSONObject;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+/**
+ * Help service
+ */
+public class HelpService extends BaseService {
+ @Inject
+ ViewContext context;
+
+ @Inject
+ protected ViewResourceHandler handler;
+
+ /**
+ * Constructor
+ */
+ public HelpService() {
+ super();
+ }
+
+ /**
+ * Version
+ * @return version
+ */
+ @GET
+ @Path("/version")
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response version(){
+ return Response.ok("0.0.1-SNAPSHOT").build();
+ }
+
+ // ================================================================================
+ // Smoke tests
+ // ================================================================================
+
+ /**
+ * HDFS Status
+ * @return status
+ */
+ @GET
+ @Path("/hdfsStatus")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response hdfsStatus(){
+ FileService.hdfsSmokeTest(context);
+ return getOKResponse();
+ }
+
+ /**
+ * ATS Status
+ * @return status
+ */
+ @GET
+ @Path("/atsStatus")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response atsStatus() {
+ try {
+ ATSRequestsDelegateImpl atsimpl = new ATSRequestsDelegateImpl(context, ATSParserFactory.getATSUrl(context));
+ atsimpl.checkATSStatus();
+ return getOKResponse();
+ }catch (Exception e){
+ throw new WebApplicationException(e);
+ }
+ }
+
+ private Response getOKResponse() {
+ JSONObject response = new JSONObject();
+ response.put("message", "OK");
+ response.put("trace", null);
+ response.put("status", "200");
+ return Response.ok().entity(response).type(MediaType.APPLICATION_JSON).build();
+ }
+
+ /**
+ * Version
+ * @return version
+ */
+ @GET
+ @Path("/test")
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response testStorage(){
+ TestBean test = new TestBean();
+ test.someData = "hello world";
+ getSharedObjectsFactory().getStorage().store(TestBean.class, test);
+ return Response.ok("OK").build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java
new file mode 100644
index 0000000..e8d3333
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import com.google.common.base.Optional;
+import org.apache.ambari.view.hive2.actor.message.DDLJob;
+import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob;
+import org.apache.ambari.view.hive2.actor.message.HiveJob;
+import org.apache.ambari.view.hive2.actor.message.job.Result;
+import org.apache.ambari.view.hive2.internal.HiveResult;
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.jdbc.HiveStatement;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class HiveJdbcConnectionDelegate implements ConnectionDelegate {
+
+ private ResultSet currentResultSet;
+ private HiveStatement currentStatement;
+ private String atsGuid;
+
+ @Override
+ public Optional<ResultSet> execute(HiveConnection connection, DDLJob job) throws SQLException {
+
+ try {
+ Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY);
+ currentStatement = (HiveStatement) statement;
+
+ for (String syncStatement : job.getSyncStatements()) {
+ // we don't care about the result
+ // fail all if one fails
+ statement.execute(syncStatement);
+ }
+
+ HiveStatement hiveStatement = (HiveStatement) statement;
+ boolean result = hiveStatement.executeAsync(job.getAsyncStatement());
+ atsGuid = hiveStatement.getYarnATSGuid();
+ if (result) {
+ // query has a result set
+ ResultSet resultSet = hiveStatement.getResultSet();
+ currentResultSet = resultSet;
+ Optional<ResultSet> resultSetOptional = Optional.of(resultSet);
+ return resultSetOptional;
+
+ }
+ return Optional.absent();
+
+ } catch (SQLException e) {
+ // Close the statement on any error
+ currentStatement.close();
+ throw e;
+ }
+ }
+
+ @Override
+ public Optional<ResultSet> executeSync(HiveConnection connection, DDLJob job) throws SQLException {
+ try {
+ Statement statement = connection.createStatement();
+ currentStatement = (HiveStatement) statement;
+
+ boolean hasResultSet = false;
+ for (String syncStatement : job.getStatements()) {
+ // we don't care about the result
+ // fail all if one fails
+ hasResultSet = statement.execute(syncStatement);
+ }
+
+ if (hasResultSet) {
+ ResultSet resultSet = statement.getResultSet();
+ //HiveResult result = new HiveResult(resultSet);
+ return Optional.of(resultSet);
+ } else {
+ return Optional.absent();
+ }
+ } catch (SQLException e) {
+ // Close the statement on any error
+ currentStatement.close();
+ throw e;
+ }
+ }
+
+
+ @Override
+ public Optional<ResultSet> getColumnMetadata(HiveConnection connection, GetColumnMetadataJob job) throws SQLException {
+ DatabaseMetaData metaData = connection.getMetaData();
+ ResultSet resultSet = metaData.getColumns("", job.getSchemaPattern(), job.getTablePattern(), job.getColumnPattern());
+ currentResultSet = resultSet;
+ return Optional.of(resultSet);
+ }
+
+ @Override
+ public Optional<ResultSet> getCurrentResultSet() {
+ return Optional.fromNullable(currentResultSet);
+ }
+
+ @Override
+ public Optional<HiveStatement> getCurrentStatement() {
+ return Optional.fromNullable(currentStatement);
+ }
+
+ @Override
+ public void closeResultSet() {
+
+ try {
+ if (currentResultSet != null) {
+ currentResultSet.close();
+ }
+ } catch (SQLException e) {
+ // Cannot do anything here
+ }
+ }
+
+ @Override
+ public void closeStatement() {
+ try {
+ if (currentStatement != null) {
+ currentStatement.close();
+ }
+ } catch (SQLException e) {
+ // cannot do anything here
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveViewImpl.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveViewImpl.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveViewImpl.java
new file mode 100644
index 0000000..695c0ab
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveViewImpl.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import org.apache.ambari.view.View;
+import org.apache.ambari.view.ViewDefinition;
+import org.apache.ambari.view.ViewInstanceDefinition;
+import org.apache.ambari.view.hive2.utils.SharedObjectsFactory;
+import org.apache.ambari.view.utils.UserLocal;
+
+
+public class HiveViewImpl implements View {
+ @Override
+ public void onDeploy(ViewDefinition definition) {
+
+ }
+
+ @Override
+ public void onCreate(ViewInstanceDefinition definition) {
+
+ }
+
+ @Override
+ public void onDestroy(ViewInstanceDefinition definition) {
+ SharedObjectsFactory.dropInstanceCache(definition.getInstanceName());
+ }
+
+ @Override
+ public void onUpdate(ViewInstanceDefinition definition) {
+ //drop all cached connection for instance
+ UserLocal.dropInstanceCache(definition.getInstanceName());
+ SharedObjectsFactory.dropInstanceCache(definition.getInstanceName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/PropertyValidator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/PropertyValidator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/PropertyValidator.java
new file mode 100644
index 0000000..e406366
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/PropertyValidator.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import org.apache.ambari.view.ViewInstanceDefinition;
+import org.apache.ambari.view.utils.ambari.ValidatorUtils;
+import org.apache.ambari.view.validation.ValidationResult;
+import org.apache.ambari.view.validation.Validator;
+
+public class PropertyValidator implements Validator {
+
+ public static final String WEBHDFS_URL = "webhdfs.url";
+ public static final String HIVE_PORT = "hive.port";
+ public static final String YARN_ATS_URL = "yarn.ats.url";
+ public static final String HIVE_SESSION_PARAMS = "hive.session.params";
+
+ @Override
+ public ValidationResult validateInstance(ViewInstanceDefinition viewInstanceDefinition, ValidationContext validationContext) {
+ return null;
+ }
+
+ @Override
+ public ValidationResult validateProperty(String property, ViewInstanceDefinition viewInstanceDefinition, ValidationContext validationContext) {
+ // Validate non cluster associated properties
+ if (property.equals(HIVE_SESSION_PARAMS)) {
+ String auth = viewInstanceDefinition.getPropertyMap().get(HIVE_SESSION_PARAMS);
+
+ if (auth != null && !auth.isEmpty()) {
+ for(String param : auth.split(";")) {
+ String[] keyvalue = param.split("=");
+ if (keyvalue.length != 2) {
+ return new InvalidPropertyValidationResult(false, "Can not parse session param " + param + " in " + auth);
+ }
+ }
+ }
+ }
+
+ // if associated with cluster, no need to validate associated properties
+ Long cluster = viewInstanceDefinition.getClusterHandle();
+ if (cluster != null) {
+ return ValidationResult.SUCCESS;
+ }
+
+ // Cluster associated properties
+ if (property.equals(WEBHDFS_URL)) {
+ String webhdfsUrl = viewInstanceDefinition.getPropertyMap().get(WEBHDFS_URL);
+ if (!ValidatorUtils.validateHdfsURL(webhdfsUrl)) {
+ return new InvalidPropertyValidationResult(false, "Must be valid URL");
+ }
+ }
+
+ if (property.equals(YARN_ATS_URL)) {
+ String atsUrl = viewInstanceDefinition.getPropertyMap().get(YARN_ATS_URL);
+ if (!ValidatorUtils.validateHttpURL(atsUrl)) {
+ return new InvalidPropertyValidationResult(false, "Must be valid URL");
+ }
+ }
+
+ return ValidationResult.SUCCESS;
+ }
+
+ public static class InvalidPropertyValidationResult implements ValidationResult {
+ private boolean valid;
+ private String detail;
+
+ public InvalidPropertyValidationResult(boolean valid, String detail) {
+ this.valid = valid;
+ this.detail = detail;
+ }
+
+ @Override
+ public boolean isValid() {
+ return valid;
+ }
+
+ @Override
+ public String getDetail() {
+ return detail;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/TestBean.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/TestBean.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/TestBean.java
new file mode 100644
index 0000000..4b49f64
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/TestBean.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2;
+
+import org.apache.ambari.view.hive2.persistence.utils.Indexed;
+
+public class TestBean implements Indexed {
+ public String someData;
+ public String id;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java
new file mode 100644
index 0000000..9d3517f
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.ConnectionDelegate;
+import org.apache.ambari.view.hive2.actor.message.AsyncJob;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.RegisterActor;
+import org.apache.ambari.view.hive2.actor.message.ResultReady;
+import org.apache.ambari.view.hive2.actor.message.StartLogAggregation;
+import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.InactivityCheck;
+import org.apache.ambari.view.hive2.internal.Either;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.jdbc.HiveStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+
+public class AsyncJdbcConnector extends JdbcConnector {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private ActorRef logAggregator = null;
+ private ActorRef asyncQueryExecutor = null;
+ private ActorRef resultSetActor = null;
+
+
+ public AsyncJdbcConnector(ViewContext viewContext, HdfsApi hdfsApi, ActorSystem system, ActorRef parent,ActorRef deathWatch, ConnectionDelegate connectionDelegate, Storage storage) {
+ super(viewContext, hdfsApi, system, parent,deathWatch, connectionDelegate, storage);
+ }
+
+ @Override
+ protected void handleJobMessage(HiveMessage message) {
+ Object job = message.getMessage();
+ if (job instanceof AsyncJob) {
+ LOG.debug("Executing async job " + message.toString());
+ execute((AsyncJob) job);
+ }
+ }
+
+ @Override
+ protected boolean isAsync() {
+ return true;
+ }
+
+ @Override
+ protected void cleanUpChildren() {
+ if(logAggregator != null && !logAggregator.isTerminated()) {
+ LOG.debug("Sending poison pill to log aggregator");
+ logAggregator.tell(PoisonPill.getInstance(), self());
+ }
+
+ if(asyncQueryExecutor != null && !asyncQueryExecutor.isTerminated()) {
+ LOG.debug("Sending poison pill to Async Query Executor");
+ asyncQueryExecutor.tell(PoisonPill.getInstance(), self());
+ }
+
+ if(resultSetActor != null && !resultSetActor.isTerminated()) {
+ LOG.debug("Sending poison pill to Resultset Actor");
+ resultSetActor.tell(PoisonPill.getInstance(), self());
+ }
+ }
+
+ @Override
+ protected void notifyFailure() {
+ AsyncExecutionFailed failure = new AsyncExecutionFailed(jobId,username,"Cannot connect to hive");
+ parent.tell(failure, self());
+ }
+
+ private void execute(AsyncJob message) {
+ this.executing = true;
+ this.jobId = message.getJobId();
+ updateJobStatus(jobId,Job.JOB_STATE_INITIALIZED);
+ if (connectable == null) {
+ notifyAndCleanUp();
+ return;
+ }
+
+ Optional<HiveConnection> connectionOptional = connectable.getConnection();
+ if (!connectionOptional.isPresent()) {
+ notifyAndCleanUp();
+ return;
+ }
+
+ try {
+ Optional<ResultSet> resultSetOptional = connectionDelegate.execute(connectionOptional.get(), message);
+ Optional<HiveStatement> currentStatement = connectionDelegate.getCurrentStatement();
+ // There should be a result set, which either has a result set, or an empty value
+ // for operations which do not return anything
+
+ logAggregator = getContext().actorOf(
+ Props.create(LogAggregator.class, system, hdfsApi, currentStatement.get(), message.getLogFile())
+ .withDispatcher("akka.actor.misc-dispatcher"), message.getJobId() + ":" +"-logAggregator"
+ );
+ deathWatch.tell(new RegisterActor(logAggregator),self());
+
+ updateGuidInJob(jobId, currentStatement.get());
+ updateJobStatus(jobId,Job.JOB_STATE_RUNNING);
+
+ if (resultSetOptional.isPresent()) {
+ // Start a result set aggregator on the same context, a notice to the parent will kill all these as well
+ // tell the result holder to assign the result set for further operations
+ resultSetActor = getContext().actorOf(Props.create(ResultSetIterator.class, self(),
+ resultSetOptional.get(),storage).withDispatcher("akka.actor.result-dispatcher"),
+ "ResultSetActor:ResultSetIterator:JobId:"+ jobId );
+ deathWatch.tell(new RegisterActor(resultSetActor),self());
+ parent.tell(new ResultReady(jobId,username, Either.<ActorRef, ActorRef>left(resultSetActor)), self());
+
+ // Start a actor to query ATS
+ } else {
+ // Case when this is an Update/query with no results
+ // Wait for operation to complete and add results;
+
+ ActorRef asyncQueryExecutor = getContext().actorOf(
+ Props.create(AsyncQueryExecutor.class, parent, currentStatement.get(),storage,jobId,username)
+ .withDispatcher("akka.actor.result-dispatcher"),
+ message.getJobId() + "-asyncQueryExecutor");
+ deathWatch.tell(new RegisterActor(asyncQueryExecutor),self());
+ parent.tell(new ResultReady(jobId,username, Either.<ActorRef, ActorRef>right(asyncQueryExecutor)), self());
+
+ }
+ // Start a actor to query log
+ logAggregator.tell(new StartLogAggregation(), self());
+
+
+ } catch (SQLException e) {
+ // update the error on the log
+ AsyncExecutionFailed failure = new AsyncExecutionFailed(message.getJobId(),username,
+ e.getMessage(), e);
+ updateJobStatus(jobId,Job.JOB_STATE_ERROR);
+ parent.tell(failure, self());
+ // Update the operation controller to write an error on tshe right side
+ LOG.error("Caught SQL excpetion for job-"+message,e);
+
+ }
+
+ // Start Inactivity timer to close the statement
+ this.inactivityScheduler = system.scheduler().schedule(
+ Duration.Zero(), Duration.create(15 * 1000, TimeUnit.MILLISECONDS),
+ this.self(), new InactivityCheck(), system.dispatcher(), null);
+ }
+
+ private void notifyAndCleanUp() {
+ updateJobStatus(jobId, Job.JOB_STATE_ERROR);
+ notifyFailure();
+ cleanUp();
+ }
+
+ private void updateJobStatus(String jobId, String jobState) {
+ JobImpl job = null;
+ try {
+ job = storage.load(JobImpl.class, jobId);
+ } catch (ItemNotFound itemNotFound) {
+ itemNotFound.printStackTrace();
+ }
+ job.setStatus(jobState);
+ storage.store(JobImpl.class, job);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java
new file mode 100644
index 0000000..2a1dbb3
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import org.apache.ambari.view.hive2.actor.message.ExecuteQuery;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
+import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp;
+import org.apache.ambari.view.hive2.internal.AsyncExecutionSuccess;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class AsyncQueryExecutor extends HiveActor {
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private Statement statement;
+ private final Storage storage;
+ private final String jobId;
+ private final ActorRef parent;
+ private final String userName;
+
+ public AsyncQueryExecutor(ActorRef parent, Statement statement, Storage storage, String jobId,String userName) {
+ this.statement = statement;
+ this.storage = storage;
+ this.jobId = jobId;
+ this.parent = parent;
+ this.userName = userName;
+ }
+
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+ Object message = hiveMessage.getMessage();
+
+ if (message instanceof ExecuteQuery) {
+ executeQuery();
+ }
+
+ }
+
+ private void executeQuery() {
+ JobImpl job = null;
+ try {
+ job = storage.load(JobImpl.class, jobId);
+ statement.getUpdateCount();
+ LOG.info("Job execution successful. Setting status in db.");
+ job.setStatus(Job.JOB_STATE_FINISHED);
+ storage.store(JobImpl.class, job);
+ sender().tell(new AsyncExecutionSuccess(), self());
+
+ } catch (SQLException e) {
+ job.setStatus(Job.JOB_STATE_ERROR);
+ sender().tell(new AsyncExecutionFailed(jobId,userName, e.getMessage(), e), self());
+ storage.store(JobImpl.class, job);
+ } catch (ItemNotFound itemNotFound) {
+ sender().tell(new AsyncExecutionFailed(jobId,userName, "Cannot load job", itemNotFound), self());
+ } finally {
+ // We can clean up this connection here
+ parent.tell(new CleanUp(), self());
+ }
+
+ }
+
+
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/DeathWatch.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/DeathWatch.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/DeathWatch.java
new file mode 100644
index 0000000..c146dd0
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/DeathWatch.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.Terminated;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.RegisterActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+
+public class DeathWatch extends HiveActor {
+
+ private final static Logger LOG =
+ LoggerFactory.getLogger(DeathWatch.class);
+
+ @Override
+ void handleMessage(HiveMessage hiveMessage) {
+ Object message = hiveMessage.getMessage();
+ if(message instanceof RegisterActor){
+ RegisterActor registerActor = (RegisterActor) message;
+ ActorRef actorRef = registerActor.getActorRef();
+ this.getContext().watch(actorRef);
+ LOG.info("Registered new actor "+ actorRef);
+ LOG.info("Registration for {} at {}", actorRef,new Date());
+ }
+ if(message instanceof Terminated){
+ Terminated terminated = (Terminated) message;
+ ActorRef actor = terminated.actor();
+ LOG.info("Received terminate for actor "+ actor);
+ LOG.info("Termination for {} at {}", actor,new Date());
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java
new file mode 100644
index 0000000..c2ee5c7
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+public class GetResultHolder {
+
+ private String jobId;
+ private String userName;
+
+ public GetResultHolder(String jobId, String userName) {
+ this.jobId = jobId;
+ this.userName = userName;
+ }
+
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ @Override
+ public String toString() {
+ return "GetResultHolder{" +
+ "jobId='" + jobId + '\'' +
+ ", userName='" + userName + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/HiveActor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/HiveActor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/HiveActor.java
new file mode 100644
index 0000000..63e5dd1
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/HiveActor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.UntypedActor;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class HiveActor extends UntypedActor {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ @Override
+ final public void onReceive(Object message) throws Exception {
+ HiveMessage hiveMessage = new HiveMessage(message);
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Received message: " + message.getClass().getName() + ", generated id: " + hiveMessage.getId() +
+ " sent by: " + sender() + ", recieved by" + self());
+ }
+
+ handleMessage(hiveMessage);
+
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Message submitted: " + hiveMessage.getId());
+
+ }
+ }
+
+ abstract void handleMessage(HiveMessage hiveMessage);
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
new file mode 100644
index 0000000..7769dde
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.ConnectionDelegate;
+import org.apache.ambari.view.hive2.actor.message.Connect;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.JobExecutionCompleted;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.InactivityCheck;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.KeepAlive;
+import org.apache.ambari.view.hive2.actor.message.lifecycle.TerminateInactivityCheck;
+import org.apache.ambari.view.hive2.internal.Connectable;
+import org.apache.ambari.view.hive2.internal.ConnectionException;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
+import org.apache.ambari.view.hive2.utils.HiveActorConfiguration;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.hive.jdbc.HiveStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Wraps one Jdbc connection per user, per instance. This is used to delegate execute the statements and
+ * creates child actors to delegate the resultset extraction, YARN/ATS querying for ExecuteJob info and Log Aggregation
+ */
+public abstract class JdbcConnector extends HiveActor {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ /**
+ * Interval for maximum inactivity allowed
+ */
+ private final static long MAX_INACTIVITY_INTERVAL = 5 * 60 * 1000;
+
+ /**
+ * Interval for maximum inactivity allowed before termination
+ */
+ private static final long MAX_TERMINATION_INACTIVITY_INTERVAL = 10 * 60 * 1000;
+
+ protected final ViewContext viewContext;
+ protected final ActorSystem system;
+ protected final Storage storage;
+
+ /**
+ * Keeps track of the timestamp when the last activity has happened. This is
+ * used to calculate the inactivity period and take lifecycle decisions based
+ * on it.
+ */
+ private long lastActivityTimestamp;
+
+ /**
+ * Akka scheduler to tick at an interval to deal with inactivity of this actor
+ */
+ protected Cancellable inactivityScheduler;
+
+ /**
+ * Akka scheduler to tick at an interval to deal with the inactivity after which
+ * the actor should be killed and connectable should be released
+ */
+ protected Cancellable terminateActorScheduler;
+
+ protected Connectable connectable = null;
+ protected final ActorRef deathWatch;
+ protected final ConnectionDelegate connectionDelegate;
+ protected final ActorRef parent;
+ protected final HdfsApi hdfsApi;
+
+ /**
+ * true if the actor is currently executing any job.
+ */
+ protected boolean executing = false;
+
+ /**
+ * true if the currently executing job is async job.
+ */
+ private boolean async = true;
+
+ /**
+ * Returns the timeout configurations.
+ */
+ private final HiveActorConfiguration actorConfiguration;
+ protected String username;
+ protected String jobId;
+
+ public JdbcConnector(ViewContext viewContext, HdfsApi hdfsApi, ActorSystem system, ActorRef parent, ActorRef deathWatch,
+ ConnectionDelegate connectionDelegate, Storage storage) {
+ this.viewContext = viewContext;
+ this.hdfsApi = hdfsApi;
+ this.system = system;
+ this.parent = parent;
+ this.deathWatch = deathWatch;
+ this.connectionDelegate = connectionDelegate;
+ this.storage = storage;
+ this.lastActivityTimestamp = System.currentTimeMillis();
+ actorConfiguration = new HiveActorConfiguration(viewContext);
+ }
+
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+ Object message = hiveMessage.getMessage();
+ if (message instanceof InactivityCheck) {
+ checkInactivity();
+ } else if (message instanceof TerminateInactivityCheck) {
+ checkTerminationInactivity();
+ } else if (message instanceof KeepAlive) {
+ keepAlive();
+ } else if (message instanceof CleanUp) {
+ cleanUp();
+ } else if (message instanceof JobExecutionCompleted) {
+ jobExecutionCompleted();
+ } else {
+ handleNonLifecycleMessage(hiveMessage);
+ }
+ }
+
+ private void handleNonLifecycleMessage(HiveMessage hiveMessage) {
+ Object message = hiveMessage.getMessage();
+ keepAlive();
+ if (message instanceof Connect) {
+ connect((Connect) message);
+ } else {
+ handleJobMessage(hiveMessage);
+ }
+
+ }
+
+ protected abstract void handleJobMessage(HiveMessage message);
+
+ protected abstract boolean isAsync();
+
+ protected abstract void notifyFailure();
+
+ protected abstract void cleanUpChildren();
+
+ private void keepAlive() {
+ lastActivityTimestamp = System.currentTimeMillis();
+ }
+
+ private void jobExecutionCompleted() {
+ // Set is executing as false so that the inactivity checks can finish cleanup
+ // after timeout
+ LOG.info("Job execution completed for user: {}. Results are ready to be fetched", username);
+ this.executing = false;
+ }
+
+ protected Optional<String> getJobId() {
+ return Optional.fromNullable(jobId);
+ }
+
+ protected Optional<String> getUsername() {
+ return Optional.fromNullable(username);
+ }
+
+ private void connect(Connect message) {
+ this.username = message.getUsername();
+ // check the connectable
+ if (connectable == null) {
+ connectable = message.getConnectable();
+ }
+ // make the connectable to Hive
+ try {
+ if (!connectable.isOpen()) {
+ connectable.connect();
+ }
+ } catch (ConnectionException e) {
+ // set up job failure
+ // notify parent about job failure
+ this.notifyFailure();
+ cleanUp();
+ return;
+ }
+
+ this.terminateActorScheduler = system.scheduler().schedule(
+ Duration.Zero(), Duration.create(60 * 1000, TimeUnit.MILLISECONDS),
+ this.getSelf(), new TerminateInactivityCheck(), system.dispatcher(), null);
+
+ }
+
+ protected void updateGuidInJob(String jobId, HiveStatement statement) {
+ String yarnAtsGuid = statement.getYarnATSGuid();
+ try {
+ JobImpl job = storage.load(JobImpl.class, jobId);
+ job.setGuid(yarnAtsGuid);
+ storage.store(JobImpl.class, job);
+ } catch (ItemNotFound itemNotFound) {
+ // Cannot do anything if the job is not present
+ }
+ }
+
+ private void checkInactivity() {
+ LOG.info("Inactivity check, executing status: {}", executing);
+ if (executing) {
+ keepAlive();
+ return;
+ }
+ long current = System.currentTimeMillis();
+ if ((current - lastActivityTimestamp) > actorConfiguration.getInactivityTimeout(MAX_INACTIVITY_INTERVAL)) {
+ // Stop all the sub-actors created
+ cleanUp();
+ }
+ }
+
+ private void checkTerminationInactivity() {
+ if (!isAsync()) {
+ // Should not terminate if job is sync. Will terminate after the job is finished.
+ stopTeminateInactivityScheduler();
+ return;
+ }
+
+ LOG.info("Termination check, executing status: {}", executing);
+ if (executing) {
+ keepAlive();
+ return;
+ }
+
+ long current = System.currentTimeMillis();
+ if ((current - lastActivityTimestamp) > actorConfiguration.getTerminationTimeout(MAX_TERMINATION_INACTIVITY_INTERVAL)) {
+ cleanUpWithTermination();
+ }
+ }
+
+ protected void cleanUp() {
+ if(jobId != null) {
+ LOG.debug("{} :: Cleaning up resources for inactivity for jobId: {}", self().path().name(), jobId);
+ } else {
+ LOG.debug("{} ::Cleaning up resources with inactivity for Sync execution.", self().path().name());
+ }
+ this.executing = false;
+ cleanUpStatementAndResultSet();
+ cleanUpChildren();
+ stopInactivityScheduler();
+ parent.tell(new FreeConnector(username, jobId, isAsync()), self());
+ }
+
+ protected void cleanUpWithTermination() {
+ LOG.debug("{} :: Cleaning up resources with inactivity for Sync execution.", self().path().name());
+ cleanUpStatementAndResultSet();
+
+ cleanUpChildren();
+ stopInactivityScheduler();
+ stopTeminateInactivityScheduler();
+ parent.tell(new DestroyConnector(username, jobId, isAsync()), this.self());
+ self().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+
+ private void cleanUpStatementAndResultSet() {
+ connectionDelegate.closeStatement();
+ connectionDelegate.closeResultSet();
+ }
+
+ private void stopTeminateInactivityScheduler() {
+ if (!(terminateActorScheduler == null || terminateActorScheduler.isCancelled())) {
+ terminateActorScheduler.cancel();
+ }
+ }
+
+ private void stopInactivityScheduler() {
+ if (!(inactivityScheduler == null || inactivityScheduler.isCancelled())) {
+ inactivityScheduler.cancel();
+ }
+ }
+
+ @Override
+ public void postStop() throws Exception {
+ stopInactivityScheduler();
+ stopTeminateInactivityScheduler();
+
+ if (connectable.isOpen()) {
+ connectable.disconnect();
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
new file mode 100644
index 0000000..284345d
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Cancellable;
+import com.google.common.base.Joiner;
+import org.apache.ambari.view.hive2.actor.message.GetMoreLogs;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.LogAggregationFinished;
+import org.apache.ambari.view.hive2.actor.message.StartLogAggregation;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.ambari.view.utils.hdfs.HdfsApiException;
+import org.apache.ambari.view.utils.hdfs.HdfsUtil;
+import org.apache.hive.jdbc.HiveStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Reads the logs for a ExecuteJob from the Statement and writes them into hdfs.
+ */
+public class LogAggregator extends HiveActor {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ public static final int AGGREGATION_INTERVAL = 5 * 1000;
+ private final HdfsApi hdfsApi;
+ private final HiveStatement statement;
+ private final String logFile;
+ private final ActorSystem system;
+
+ private Cancellable moreLogsScheduler;
+ private ActorRef parent;
+
+ public LogAggregator(ActorSystem system, HdfsApi hdfsApi, HiveStatement statement, String logFile) {
+ this.system = system;
+ this.hdfsApi = hdfsApi;
+ this.statement = statement;
+ this.logFile = logFile;
+ }
+
+ @Override
+ public void handleMessage(HiveMessage hiveMessage) {
+ Object message = hiveMessage.getMessage();
+ if (message instanceof StartLogAggregation) {
+ start();
+ }
+
+ if (message instanceof GetMoreLogs) {
+ try {
+ getMoreLogs();
+ } catch (SQLException e) {
+ LOG.error("SQL Error while getting logs. Tried writing to: {}", logFile);
+ } catch (HdfsApiException e) {
+ LOG.warn("HDFS Error while getting writing logs to {}", logFile);
+
+ }
+ }
+ }
+
+ private void start() {
+ parent = this.getSender();
+ this.moreLogsScheduler = system.scheduler().schedule(
+ Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, TimeUnit.MILLISECONDS),
+ this.getSelf(), new GetMoreLogs(), system.dispatcher(), null);
+ }
+
+ private void getMoreLogs() throws SQLException, HdfsApiException {
+ if (statement.hasMoreLogs()) {
+ List<String> logs = statement.getQueryLog();
+ String allLogs = Joiner.on("\n").skipNulls().join(logs);
+ HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
+ } else {
+ moreLogsScheduler.cancel();
+ parent.tell(new LogAggregationFinished(), ActorRef.noSender());
+ }
+ }
+
+ @Override
+ public void postStop() throws Exception {
+ if(moreLogsScheduler != null && !moreLogsScheduler.isCancelled()){
+ moreLogsScheduler.cancel();
+ }
+
+ }
+
+}