You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2016/06/27 23:36:41 UTC
[15/34] ambari git commit: AMBARI-17355 & AMBARI-17354: POC: FE & BE
changes for first class support for Yarn hosted services
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSParser.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSParser.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSParser.java
new file mode 100644
index 0000000..e465276
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSParser.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.atsJobs;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Parser of ATS responses
+ */
+public class ATSParser implements IATSParser {
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(ATSParser.class);
+
+ private ATSRequestsDelegate delegate;
+
+ private static final long MillisInSecond = 1000L;
+
+ public ATSParser(ATSRequestsDelegate delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public List<HiveQueryId> getHiveQueryIdsList(String username) {
+ JSONObject entities = delegate.hiveQueryIdList(username);
+ JSONArray jobs = (JSONArray) entities.get("entities");
+
+ List<HiveQueryId> parsedJobs = new LinkedList<HiveQueryId>();
+ for(Object job : jobs) {
+ try {
+ HiveQueryId parsedJob = parseAtsHiveJob((JSONObject) job);
+ parsedJobs.add(parsedJob);
+ } catch (Exception ex) {
+ LOG.error("Error while parsing ATS job", ex);
+ }
+ }
+
+ return parsedJobs;
+ }
+
+ @Override
+ public List<TezVertexId> getVerticesForDAGId(String dagId) {
+ JSONObject entities = delegate.tezVerticesListForDAG(dagId);
+ JSONArray vertices = (JSONArray) entities.get("entities");
+
+ List<TezVertexId> parsedVertices = new LinkedList<TezVertexId>();
+ for(Object vertex : vertices) {
+ try {
+ TezVertexId parsedVertex = parseVertex((JSONObject) vertex);
+ parsedVertices.add(parsedVertex);
+ } catch (Exception ex) {
+ LOG.error("Error while parsing the vertex", ex);
+ }
+ }
+
+ return parsedVertices;
+ }
+
+ @Override
+ public HiveQueryId getHiveQueryIdByOperationId(String guidString) {
+ JSONObject entities = delegate.hiveQueryIdByOperationId(guidString);
+ JSONArray jobs = (JSONArray) entities.get("entities");
+
+ assert jobs.size() <= 1;
+ if (jobs.size() == 0) {
+ return new HiveQueryId();
+ }
+
+ return parseAtsHiveJob((JSONObject) jobs.get(0));
+ }
+
+ @Override
+ public TezDagId getTezDAGByName(String name) {
+ JSONArray tezDagEntities = (JSONArray) delegate.tezDagByName(name).get("entities");
+ return parseTezDag(tezDagEntities);
+ }
+
+ @Override
+ public TezDagId getTezDAGByEntity(String entity) {
+ JSONArray tezDagEntities = (JSONArray) delegate.tezDagByEntity(entity).get("entities");
+ return parseTezDag(tezDagEntities);
+ }
+
+ private TezDagId parseTezDag(JSONArray tezDagEntities) {
+ assert tezDagEntities.size() <= 1;
+ if (tezDagEntities.size() == 0) {
+ return new TezDagId();
+ }
+ JSONObject tezDagEntity = (JSONObject) tezDagEntities.get(0);
+
+ TezDagId parsedDag = new TezDagId();
+ JSONArray applicationIds = (JSONArray) ((JSONObject) tezDagEntity.get("primaryfilters")).get("applicationId");
+ parsedDag.entity = (String) tezDagEntity.get("entity");
+ parsedDag.applicationId = (String) applicationIds.get(0);
+ parsedDag.status = (String) ((JSONObject) tezDagEntity.get("otherinfo")).get("status");
+ return parsedDag;
+ }
+
+ private HiveQueryId parseAtsHiveJob(JSONObject job) {
+ HiveQueryId parsedJob = new HiveQueryId();
+
+ parsedJob.entity = (String) job.get("entity");
+ parsedJob.url = delegate.hiveQueryIdDirectUrl((String) job.get("entity"));
+ parsedJob.starttime = ((Long) job.get("starttime")) / MillisInSecond;
+
+ JSONObject primaryfilters = (JSONObject) job.get("primaryfilters");
+ JSONArray operationIds = (JSONArray) primaryfilters.get("operationid");
+ if (operationIds != null) {
+ parsedJob.operationId = (String) (operationIds).get(0);
+ }
+ JSONArray users = (JSONArray) primaryfilters.get("user");
+ if (users != null) {
+ parsedJob.user = (String) (users).get(0);
+ }
+
+ JSONObject lastEvent = getLastEvent(job);
+ long lastEventTimestamp = ((Long) lastEvent.get("timestamp")) / MillisInSecond;
+
+ parsedJob.duration = lastEventTimestamp - parsedJob.starttime;
+
+ JSONObject otherinfo = (JSONObject) job.get("otherinfo");
+ if (otherinfo.get("QUERY") != null) { // workaround for HIVE-10829
+ JSONObject query = (JSONObject) JSONValue.parse((String) otherinfo.get("QUERY"));
+
+ parsedJob.query = (String) query.get("queryText");
+ JSONObject stages = (JSONObject) ((JSONObject) query.get("queryPlan")).get("STAGE PLANS");
+
+ List<String> dagIds = new LinkedList<String>();
+ List<JSONObject> stagesList = new LinkedList<JSONObject>();
+
+ for (Object key : stages.keySet()) {
+ JSONObject stage = (JSONObject) stages.get(key);
+ if (stage.get("Tez") != null) {
+ String dagId = (String) ((JSONObject) stage.get("Tez")).get("DagId:");
+ dagIds.add(dagId);
+ }
+ stagesList.add(stage);
+ }
+ parsedJob.dagNames = dagIds;
+ parsedJob.stages = stagesList;
+ }
+
+ if (otherinfo.get("VERSION") != null) {
+ parsedJob.version = (Long) otherinfo.get("VERSION");
+ }
+ return parsedJob;
+ }
+
+ private TezVertexId parseVertex(JSONObject vertex) {
+ TezVertexId tezVertexId = new TezVertexId();
+ tezVertexId.entity = (String)vertex.get("entity");
+ JSONObject otherinfo = (JSONObject)vertex.get("otherinfo");
+ if (otherinfo != null)
+ tezVertexId.vertexName = (String)otherinfo.get("vertexName");
+ return tezVertexId;
+ }
+
+ private JSONObject getLastEvent(JSONObject atsEntity) {
+ JSONArray events = (JSONArray) atsEntity.get("events");
+ return (JSONObject) events.get(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSParserFactory.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSParserFactory.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSParserFactory.java
new file mode 100644
index 0000000..4ff2621
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSParserFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.atsJobs;
+
+import org.apache.ambari.view.ViewContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ATSParserFactory {
+
+ private ViewContext context;
+
+ public ATSParserFactory(ViewContext context) {
+ this.context = context;
+ }
+
+ public ATSParser getATSParser() {
+ ATSRequestsDelegateImpl delegate = new ATSRequestsDelegateImpl(context, getATSUrl(context));
+ return new ATSParser(delegate);
+ }
+
+ public static String getATSUrl(ViewContext context) {
+ return context.getProperties().get("yarn.ats.url");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSRequestsDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSRequestsDelegate.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSRequestsDelegate.java
new file mode 100644
index 0000000..ac8cd22
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSRequestsDelegate.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.atsJobs;
+
+import org.json.simple.JSONObject;
+
+public interface ATSRequestsDelegate {
+ String hiveQueryIdDirectUrl(String entity);
+
+ String hiveQueryIdOperationIdUrl(String operationId);
+
+ String tezDagDirectUrl(String entity);
+
+ String tezDagNameUrl(String name);
+
+ String tezVerticesListForDAGUrl(String dagId);
+
+ JSONObject hiveQueryIdList(String username);
+
+ JSONObject hiveQueryIdByOperationId(String operationId);
+
+ JSONObject tezDagByName(String name);
+
+ JSONObject tezVerticesListForDAG(String dagId);
+
+ JSONObject tezDagByEntity(String entity);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java
new file mode 100644
index 0000000..67497fd
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.atsJobs;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.commons.io.IOUtils;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+
+public class ATSRequestsDelegateImpl implements ATSRequestsDelegate {
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(ATSRequestsDelegateImpl.class);
+ public static final String EMPTY_ENTITIES_JSON = "{ \"entities\" : [ ] }";
+
+ private ViewContext context;
+ private String atsUrl;
+
+ public ATSRequestsDelegateImpl(ViewContext context, String atsUrl) {
+ this.context = context;
+ this.atsUrl = addProtocolIfMissing(atsUrl);
+ }
+
+ private String addProtocolIfMissing(String atsUrl) {
+ if (!atsUrl.matches("^[^:]+://.*$"))
+ atsUrl = "http://" + atsUrl;
+ return atsUrl;
+ }
+
+ @Override
+ public String hiveQueryIdDirectUrl(String entity) {
+ return atsUrl + "/ws/v1/timeline/HIVE_QUERY_ID/" + entity;
+ }
+
+ @Override
+ public String hiveQueryIdOperationIdUrl(String operationId) {
+ // ATS parses operationId started with digit as integer and not returns the response.
+ // Quotation prevents this.
+ return atsUrl + "/ws/v1/timeline/HIVE_QUERY_ID?primaryFilter=operationid:%22" + operationId + "%22";
+ }
+
+ @Override
+ public String tezDagDirectUrl(String entity) {
+ return atsUrl + "/ws/v1/timeline/TEZ_DAG_ID/" + entity;
+ }
+
+ @Override
+ public String tezDagNameUrl(String name) {
+ return atsUrl + "/ws/v1/timeline/TEZ_DAG_ID?primaryFilter=dagName:" + name;
+ }
+
+ @Override
+ public String tezVerticesListForDAGUrl(String dagId) {
+ return atsUrl + "/ws/v1/timeline/TEZ_VERTEX_ID?primaryFilter=TEZ_DAG_ID:" + dagId;
+ }
+
+ @Override
+ public JSONObject hiveQueryIdList(String username) {
+ String hiveQueriesListUrl = atsUrl + "/ws/v1/timeline/HIVE_QUERY_ID?primaryFilter=requestuser:" + username;
+ String response = readFromWithDefault(hiveQueriesListUrl, "{ \"entities\" : [ ] }");
+ return (JSONObject) JSONValue.parse(response);
+ }
+
+ @Override
+ public JSONObject hiveQueryIdByOperationId(String operationId) {
+ String hiveQueriesListUrl = hiveQueryIdOperationIdUrl(operationId);
+ String response = readFromWithDefault(hiveQueriesListUrl, "{ \"entities\" : [ ] }");
+ return (JSONObject) JSONValue.parse(response);
+ }
+
+ @Override
+ public JSONObject tezDagByName(String name) {
+ String tezDagUrl = tezDagNameUrl(name);
+ String response = readFromWithDefault(tezDagUrl, EMPTY_ENTITIES_JSON);
+ return (JSONObject) JSONValue.parse(response);
+ }
+
+ @Override
+ public JSONObject tezDagByEntity(String entity) {
+ String tezDagEntityUrl = tezDagEntityUrl(entity);
+ String response = readFromWithDefault(tezDagEntityUrl, EMPTY_ENTITIES_JSON);
+ return (JSONObject) JSONValue.parse(response);
+ }
+
+ private String tezDagEntityUrl(String entity) {
+ return atsUrl + "/ws/v1/timeline/TEZ_DAG_ID?primaryFilter=callerId:" + entity;
+ }
+
+ public boolean checkATSStatus() throws IOException {
+ String url = atsUrl + "/ws/v1/timeline/";
+ InputStream responseInputStream = context.getURLStreamProvider().readAsCurrent(url, "GET",
+ (String)null, new HashMap<String, String>());
+ IOUtils.toString(responseInputStream);
+ return true;
+ }
+
+ @Override
+ public JSONObject tezVerticesListForDAG(String dagId) {
+ String response = readFromWithDefault(tezVerticesListForDAGUrl(dagId), "{ \"entities\" : [ ] }");
+ return (JSONObject) JSONValue.parse(response);
+ }
+
+
+
+ protected String readFromWithDefault(String atsUrl, String defaultResponse) {
+ String response;
+ try {
+ InputStream responseInputStream = context.getURLStreamProvider().readAsCurrent(atsUrl, "GET",
+ (String)null, new HashMap<String, String>());
+ response = IOUtils.toString(responseInputStream);
+ } catch (IOException e) {
+ LOG.error("Error while reading from ATS", e);
+ response = defaultResponse;
+ }
+ return response;
+ }
+
+ public String getAtsUrl() {
+ return atsUrl;
+ }
+
+ public void setAtsUrl(String atsUrl) {
+ this.atsUrl = atsUrl;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/HiveQueryId.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/HiveQueryId.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/HiveQueryId.java
new file mode 100644
index 0000000..572749d
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/HiveQueryId.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.atsJobs;
+
+import org.json.simple.JSONObject;
+
+import java.util.List;
+
+public class HiveQueryId {
+ public static long ATS_15_RESPONSE_VERSION = 2; // version returned from ATS 1.5 release
+
+ public String url;
+
+ public String entity;
+ public String query;
+
+ public List<String> dagNames;
+
+ public List<JSONObject> stages;
+
+ public long starttime;
+ public long duration;
+ public String operationId;
+ public String user;
+ public long version;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/IATSParser.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/IATSParser.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/IATSParser.java
new file mode 100644
index 0000000..e545c50
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/IATSParser.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.atsJobs;
+
+import java.util.List;
+
+public interface IATSParser {
+ List<HiveQueryId> getHiveQueryIdsList(String username);
+
+ List<TezVertexId> getVerticesForDAGId(String dagId);
+
+ HiveQueryId getHiveQueryIdByOperationId(String guidString);
+
+ TezDagId getTezDAGByName(String name);
+
+ TezDagId getTezDAGByEntity(String entity);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/TezDagId.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/TezDagId.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/TezDagId.java
new file mode 100644
index 0000000..80fef19
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/TezDagId.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.atsJobs;
+
+public class TezDagId {
+ public static final String STATUS_UNKNOWN = "UNKNOWN";
+ public String applicationId = "";
+ public String entity = "";
+ public String status = STATUS_UNKNOWN;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/TezVertexId.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/TezVertexId.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/TezVertexId.java
new file mode 100644
index 0000000..6ba11bb
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/atsJobs/TezVertexId.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.atsJobs;
+
+public class TezVertexId {
+ public String entity;
+ public String vertexName;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMParser.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMParser.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMParser.java
new file mode 100644
index 0000000..7bde8b0
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMParser.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.rm;
+
+import org.apache.ambari.view.hive2.resources.jobs.atsJobs.TezVertexId;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Parser of Resource Manager responses
+ */
+public class RMParser {
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(RMParser.class);
+ private RMRequestsDelegate delegate;
+
+ public RMParser(RMRequestsDelegate delegate) {
+ this.delegate = delegate;
+ }
+
+ /**
+ * Progress of DAG
+ * @param appId App Id
+ * @param dagId DAG Id
+ * @return progress of DAG
+ */
+ public Double getDAGProgress(String appId, String dagId) {
+ String dagIdx = parseDagIdIndex(dagId);
+ JSONObject progresses = delegate.dagProgress(appId, dagIdx);
+
+ double dagProgressValue;
+ if (progresses != null) {
+ JSONObject dagProgress = (JSONObject) progresses.get("dagProgress");
+ dagProgressValue = (Double) (dagProgress.get("progress"));
+ } else {
+ LOG.error("Error while retrieving progress of " + appId + ":" + dagId + ". 0 assumed.");
+ dagProgressValue = 0;
+ }
+ return dagProgressValue;
+ }
+
+ /**
+ * Progress of vertices
+ * @param appId App Id
+ * @param dagId DAG Id
+ * @param vertices vertices list
+ * @return list of vertices
+ */
+ public List<VertexProgress> getDAGVerticesProgress(String appId, String dagId, List<TezVertexId> vertices) {
+ String dagIdx = parseDagIdIndex(dagId);
+
+ Map<String, String> vertexIdToEntityMapping = new HashMap<String, String>();
+ StringBuilder builder = new StringBuilder();
+ if (vertices.size() > 0) {
+ for (TezVertexId vertexId : vertices) {
+ String[] parts = vertexId.entity.split("_");
+ String vertexIdx = parts[parts.length - 1];
+ builder.append(vertexIdx).append(",");
+
+ vertexIdToEntityMapping.put(vertexId.entity, vertexId.vertexName);
+ }
+ builder.setLength(builder.length() - 1); // remove last comma
+ }
+
+ String commaSeparatedVertices = builder.toString();
+
+ List<VertexProgress> parsedVertexProgresses = new LinkedList<VertexProgress>();
+ JSONObject vertexProgressesResponse = delegate.verticesProgress(
+ appId, dagIdx, commaSeparatedVertices);
+ if (vertexProgressesResponse == null) {
+ LOG.error("Error while retrieving progress of vertices " +
+ appId + ":" + dagId + ":" + commaSeparatedVertices + ". 0 assumed for all vertices.");
+ for (TezVertexId vertexId : vertices) {
+ VertexProgress vertexProgressInfo = new VertexProgress();
+ vertexProgressInfo.name = vertexId.vertexName;
+ vertexProgressInfo.progress = 0.0;
+ parsedVertexProgresses.add(vertexProgressInfo);
+ }
+ return parsedVertexProgresses;
+ }
+ JSONArray vertexProgresses = (JSONArray) vertexProgressesResponse.get("vertexProgresses");
+
+ for (Object vertex : vertexProgresses) {
+ JSONObject jsonObject = (JSONObject) vertex;
+
+ VertexProgress vertexProgressInfo = new VertexProgress();
+ vertexProgressInfo.id = (String) jsonObject.get("id");
+ vertexProgressInfo.name = vertexIdToEntityMapping.get(vertexProgressInfo.id);
+ vertexProgressInfo.progress = (Double) jsonObject.get("progress");
+
+ parsedVertexProgresses.add(vertexProgressInfo);
+ }
+ return parsedVertexProgresses;
+ }
+
+ public String parseDagIdIndex(String dagId) {
+ String[] dagIdParts = dagId.split("_");
+ return dagIdParts[dagIdParts.length - 1];
+ }
+
+ public static class VertexProgress {
+ public String id;
+ public String name;
+ public Double progress;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMParserFactory.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMParserFactory.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMParserFactory.java
new file mode 100644
index 0000000..4b28e64
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMParserFactory.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.rm;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.utils.ambari.AmbariApi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RMParserFactory {
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(RMParserFactory.class);
+
+ private final ViewContext context;
+ private final AmbariApi ambariApi;
+
+ public RMParserFactory(ViewContext context) {
+ this.context = context;
+ this.ambariApi = new AmbariApi(context);
+ }
+
+ public RMParser getRMParser() {
+ String rmUrl = getRMUrl();
+
+ RMRequestsDelegate delegate = new RMRequestsDelegateImpl(context, rmUrl);
+ return new RMParser(delegate);
+ }
+
+ public String getRMUrl() {
+ return ambariApi.getServices().getRMUrl();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMRequestsDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMRequestsDelegate.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMRequestsDelegate.java
new file mode 100644
index 0000000..5c059c0
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMRequestsDelegate.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.rm;
+
+import org.json.simple.JSONObject;
+
+public interface RMRequestsDelegate {
+ String dagProgressUrl(String appId, String dagIdx);
+
+ String verticesProgressUrl(String appId, String dagIdx, String vertices);
+
+ JSONObject dagProgress(String appId, String dagIdx);
+
+ JSONObject verticesProgress(String appId, String dagIdx, String commaSeparatedVertices);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMRequestsDelegateImpl.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMRequestsDelegateImpl.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMRequestsDelegateImpl.java
new file mode 100644
index 0000000..6d2d22e
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/rm/RMRequestsDelegateImpl.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.rm;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.utils.ServiceFormattedException;
+import org.apache.commons.io.IOUtils;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+
+public class RMRequestsDelegateImpl implements RMRequestsDelegate {
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(RMRequestsDelegateImpl.class);
+ public static final String EMPTY_ENTITIES_JSON = "{ \"entities\" : [ ] }";
+
+ private ViewContext context;
+ private String rmUrl;
+
+ public RMRequestsDelegateImpl(ViewContext context, String rmUrl) {
+ this.context = context;
+ this.rmUrl = rmUrl;
+ }
+
+ @Override
+ public String dagProgressUrl(String appId, String dagIdx) {
+ return rmUrl + String.format("/proxy/%s/ws/v1/tez/dagProgress?dagID=%s", appId, dagIdx);
+ }
+
+ @Override
+ public String verticesProgressUrl(String appId, String dagIdx, String vertices) {
+ return rmUrl + String.format("/proxy/%s/ws/v1/tez/vertexProgresses?dagID=%s&vertexID=%s", appId, dagIdx, vertices);
+ }
+
+ @Override
+ public JSONObject dagProgress(String appId, String dagIdx) {
+ String url = dagProgressUrl(appId, dagIdx);
+ String response;
+ try {
+ InputStream responseInputStream = context.getURLStreamProvider().readFrom(url, "GET",
+ (String)null, new HashMap<String, String>());
+ response = IOUtils.toString(responseInputStream);
+ } catch (IOException e) {
+ throw new ServiceFormattedException(
+ String.format("R010 DAG %s in app %s not found or ResourceManager is unreachable", dagIdx, appId));
+ }
+ return (JSONObject) JSONValue.parse(response);
+ }
+
+ @Override
+ public JSONObject verticesProgress(String appId, String dagIdx, String commaSeparatedVertices) {
+ String url = verticesProgressUrl(appId, dagIdx, commaSeparatedVertices);
+ String response;
+ try {
+ InputStream responseInputStream = context.getURLStreamProvider().readFrom(url, "GET",
+ (String)null, new HashMap<String, String>());
+ response = IOUtils.toString(responseInputStream);
+ } catch (IOException e) {
+ throw new ServiceFormattedException(
+ String.format("R020 DAG %s in app %s not found or ResourceManager is unreachable", dagIdx, appId));
+ }
+ return (JSONObject) JSONValue.parse(response);
+ }
+
+ protected String readFromWithDefault(String url, String defaultResponse) {
+ String response;
+ try {
+ InputStream responseInputStream = context.getURLStreamProvider().readFrom(url, "GET",
+ (String)null, new HashMap<String, String>());
+ response = IOUtils.toString(responseInputStream);
+ } catch (IOException e) {
+ LOG.error("Error while reading from RM", e);
+ response = defaultResponse;
+ }
+ return response;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/IJobControllerFactory.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/IJobControllerFactory.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/IJobControllerFactory.java
new file mode 100644
index 0000000..1e1345e
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/IJobControllerFactory.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.viewJobs;
+
+public interface IJobControllerFactory {
+ JobController createControllerForJob(Job job);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/Job.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/Job.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/Job.java
new file mode 100644
index 0000000..816e77a
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/Job.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.viewJobs;
+
+
+import org.apache.ambari.view.hive2.persistence.utils.Indexed;
+import org.apache.ambari.view.hive2.persistence.utils.PersonalResource;
+
+import java.io.Serializable;
+
+/**
+ * Interface for ExecuteJob bean to create Proxy for it
+ */
+public interface Job extends Serializable,Indexed,PersonalResource {
+ String JOB_STATE_UNKNOWN = "UNKNOWN";
+ String JOB_STATE_INITIALIZED = "INITIALIZED";
+ String JOB_STATE_RUNNING = "RUNNING";
+ String JOB_STATE_FINISHED = "SUCCEEDED";
+ String JOB_STATE_CANCELED = "CANCELED";
+ String JOB_STATE_CLOSED = "CLOSED";
+ String JOB_STATE_ERROR = "ERROR";
+ String JOB_STATE_PENDING = "PENDING";
+
+ String getId();
+
+ void setId(String id);
+
+ String getOwner();
+
+ void setOwner(String owner);
+
+ String getTitle();
+
+ void setTitle(String title);
+
+ String getQueryFile();
+
+ void setQueryFile(String queryFile);
+
+ Long getDateSubmitted();
+
+ void setDateSubmitted(Long dateSubmitted);
+
+ Long getDuration();
+
+ void setDuration(Long duration);
+
+ String getStatus();
+
+ void setStatus(String status);
+
+ String getForcedContent();
+
+ void setForcedContent(String forcedContent);
+
+ String getQueryId();
+
+ void setQueryId(String queryId);
+
+ String getStatusDir();
+
+ void setStatusDir(String statusDir);
+
+ String getDataBase();
+
+ void setDataBase(String dataBase);
+
+ String getLogFile();
+
+ void setLogFile(String logFile);
+
+ String getConfFile();
+
+ void setConfFile(String confFile);
+
+ String getApplicationId();
+
+ void setApplicationId(String applicationId);
+
+ String getDagName();
+
+ void setDagName(String dagName);
+
+ String getDagId();
+
+ void setDagId(String dagId);
+
+ String getSessionTag();
+
+ void setSessionTag(String sessionTag);
+
+ String getSqlState();
+
+ void setSqlState(String sqlState);
+
+ String getStatusMessage();
+
+ void setStatusMessage(String message);
+
+ String getReferrer();
+
+ void setReferrer(String referrer);
+
+ String getGlobalSettings();
+
+ void setGlobalSettings(String globalSettings);
+
+ String getGuid();
+
+ void setGuid(String guid);
+
+ String getErrorFile();
+
+ void setErrorFile(String errorFile);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobController.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobController.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobController.java
new file mode 100644
index 0000000..46a87d9
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobController.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.viewJobs;
+
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+
+public interface JobController {
+
+ void submit() throws Throwable;
+
+ void cancel() throws ItemNotFound;
+
+ Job getJob();
+
+ /**
+ * Use carefully. Returns unproxied bean object
+ * @return unproxied bean object
+ */
+ Job getJobPOJO();
+
+ void afterCreation();
+
+ void update();
+
+ boolean isModified();
+
+ void clearModified();
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerFactory.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerFactory.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerFactory.java
new file mode 100644
index 0000000..8428a87
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.viewJobs;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.utils.SharedObjectsFactory;
+
+public class JobControllerFactory implements IJobControllerFactory {
+ private SharedObjectsFactory sharedObjectsFactory;
+ private ViewContext context;
+
+ public JobControllerFactory(ViewContext context, SharedObjectsFactory sharedObjectsFactory) {
+ this.sharedObjectsFactory = sharedObjectsFactory;
+ this.context = context;
+ }
+
+ @Override
+ public JobController createControllerForJob(Job job) {
+ return new JobControllerImpl(context, job,
+ sharedObjectsFactory.getSavedQueryResourceManager(),
+ sharedObjectsFactory.getATSParser(),
+ sharedObjectsFactory.getHdfsApi());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java
new file mode 100644
index 0000000..a3edc8f
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.viewJobs;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.client.AsyncJobRunner;
+import org.apache.ambari.view.hive2.client.AsyncJobRunnerImpl;
+import org.apache.ambari.view.hive2.client.ConnectionConfig;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.resources.jobs.ModifyNotificationDelegate;
+import org.apache.ambari.view.hive2.resources.jobs.ModifyNotificationInvocationHandler;
+import org.apache.ambari.view.hive2.resources.jobs.atsJobs.IATSParser;
+import org.apache.ambari.view.hive2.resources.savedQueries.SavedQuery;
+import org.apache.ambari.view.hive2.resources.savedQueries.SavedQueryResourceManager;
+import org.apache.ambari.view.hive2.utils.BadRequestFormattedException;
+import org.apache.ambari.view.hive2.utils.FilePaginator;
+import org.apache.ambari.view.hive2.utils.MisconfigurationFormattedException;
+import org.apache.ambari.view.hive2.utils.ServiceFormattedException;
+import org.apache.ambari.view.hive2.ConnectionFactory;
+import org.apache.ambari.view.hive2.ConnectionSystem;
+import org.apache.ambari.view.hive2.actor.message.AsyncJob;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.ambari.view.utils.hdfs.HdfsApiException;
+import org.apache.ambari.view.utils.hdfs.HdfsUtil;
+import org.apache.commons.lang3.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Proxy;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class JobControllerImpl implements JobController, ModifyNotificationDelegate {
+ private final static Logger LOG =
+ LoggerFactory.getLogger(JobControllerImpl.class);
+
+ private ViewContext context;
+ private HdfsApi hdfsApi;
+ private Job jobUnproxied;
+ private Job job;
+ private boolean modified;
+
+ private SavedQueryResourceManager savedQueryResourceManager;
+ private IATSParser atsParser;
+
+ /**
+ * JobController constructor
+ * Warning: Create JobControllers ONLY using JobControllerFactory!
+ */
+ public JobControllerImpl(ViewContext context, Job job,
+ SavedQueryResourceManager savedQueryResourceManager,
+ IATSParser atsParser,
+ HdfsApi hdfsApi) {
+ this.context = context;
+ setJobPOJO(job);
+ this.savedQueryResourceManager = savedQueryResourceManager;
+ this.atsParser = atsParser;
+ this.hdfsApi = hdfsApi;
+
+ }
+
+ public String getQueryForJob() {
+ FilePaginator paginator = new FilePaginator(job.getQueryFile(), hdfsApi);
+ String query;
+ try {
+ query = paginator.readPage(0); //warning - reading only 0 page restricts size of query to 1MB
+ } catch (IOException e) {
+ throw new ServiceFormattedException("F030 Error when reading file " + job.getQueryFile(), e);
+ } catch (InterruptedException e) {
+ throw new ServiceFormattedException("F030 Error when reading file " + job.getQueryFile(), e);
+ }
+ return query;
+ }
+
+ private static final String DEFAULT_DB = "default";
+
+ public String getJobDatabase() {
+ if (job.getDataBase() != null) {
+ return job.getDataBase();
+ } else {
+ return DEFAULT_DB;
+ }
+ }
+
+
+ @Override
+ public void submit() throws Throwable {
+ String jobDatabase = getJobDatabase();
+ String query = getQueryForJob();
+ ConnectionSystem system = ConnectionSystem.getInstance();
+ AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem());
+ AsyncJob asyncJob = new AsyncJob(job.getId(), context.getUsername(), getStatements(jobDatabase, query), job.getLogFile(), context);
+ asyncJobRunner.submitJob(getHiveConnectionConfig(), asyncJob, job);
+
+ }
+
+ private String[] getStatements(String jobDatabase, String query) {
+ String[] queries = query.split(";");
+
+
+ String[] strings = {"use " + jobDatabase};
+ return ArrayUtils.addAll(strings, queries);
+ }
+
+
+ @Override
+ public void cancel() throws ItemNotFound {
+ //TODO: Cancel job
+ }
+
+ @Override
+ public void update() {
+ updateJobDuration();
+ }
+
+
+ @Override
+ public Job getJob() {
+ return job;
+ }
+
+ /**
+ * Use carefully. Returns unproxied bean object
+ * @return unproxied bean object
+ */
+ @Override
+ public Job getJobPOJO() {
+ return jobUnproxied;
+ }
+
+ public void setJobPOJO(Job jobPOJO) {
+ Job jobModifyNotificationProxy = (Job) Proxy.newProxyInstance(jobPOJO.getClass().getClassLoader(),
+ new Class[]{Job.class},
+ new ModifyNotificationInvocationHandler(jobPOJO, this));
+ this.job = jobModifyNotificationProxy;
+
+ this.jobUnproxied = jobPOJO;
+ }
+
+
+ @Override
+ public void afterCreation() {
+ setupStatusDirIfNotPresent();
+ setupQueryFileIfNotPresent();
+ setupLogFileIfNotPresent();
+
+ setCreationDate();
+ }
+
+ public void setupLogFileIfNotPresent() {
+ if (job.getLogFile() == null || job.getLogFile().isEmpty()) {
+ setupLogFile();
+ }
+ }
+
+ public void setupQueryFileIfNotPresent() {
+ if (job.getQueryFile() == null || job.getQueryFile().isEmpty()) {
+ setupQueryFile();
+ }
+ }
+
+ public void setupStatusDirIfNotPresent() {
+ if (job.getStatusDir() == null || job.getStatusDir().isEmpty()) {
+ setupStatusDir();
+ }
+ }
+
+ private static final long MillisInSecond = 1000L;
+
+ public void updateJobDuration() {
+ job.setDuration(System.currentTimeMillis() / MillisInSecond - job.getDateSubmitted());
+ }
+
+ public void setCreationDate() {
+ job.setDateSubmitted(System.currentTimeMillis() / MillisInSecond);
+ }
+
+
+ private void setupLogFile() {
+ LOG.debug("Creating log file for job#" + job.getId());
+
+ String logFile = job.getStatusDir() + "/" + "logs";
+ try {
+ HdfsUtil.putStringToFile(hdfsApi, logFile, "");
+ } catch (HdfsApiException e) {
+ throw new ServiceFormattedException(e);
+ }
+
+ job.setLogFile(logFile);
+ LOG.debug("Log file for job#" + job.getId() + ": " + logFile);
+ }
+
+ private void setupStatusDir() {
+ String newDirPrefix = makeStatusDirectoryPrefix();
+ String newDir = null;
+ try {
+ newDir = HdfsUtil.findUnallocatedFileName(hdfsApi, newDirPrefix, "");
+ } catch (HdfsApiException e) {
+ throw new ServiceFormattedException(e);
+ }
+
+ job.setStatusDir(newDir);
+ LOG.debug("Status dir for job#" + job.getId() + ": " + newDir);
+ }
+
+ private String makeStatusDirectoryPrefix() {
+ String userScriptsPath = context.getProperties().get("jobs.dir");
+
+ if (userScriptsPath == null) { // TODO: move check to initialization code
+ String msg = "jobs.dir is not configured!";
+ LOG.error(msg);
+ throw new MisconfigurationFormattedException("jobs.dir");
+ }
+
+ String normalizedName = String.format("hive-job-%s", job.getId());
+ String timestamp = new SimpleDateFormat("yyyy-MM-dd_hh-mm").format(new Date());
+ return String.format(userScriptsPath +
+ "/%s-%s", normalizedName, timestamp);
+ }
+
+ private void setupQueryFile() {
+ String statusDir = job.getStatusDir();
+ assert statusDir != null : "setupStatusDir() should be called first";
+
+ String jobQueryFilePath = statusDir + "/" + "query.hql";
+
+ try {
+
+ if (job.getForcedContent() != null) {
+
+ HdfsUtil.putStringToFile(hdfsApi, jobQueryFilePath, job.getForcedContent());
+ job.setForcedContent(""); // prevent forcedContent to be written to DB
+
+ } else if (job.getQueryId() != null) {
+
+ String savedQueryFile = getRelatedSavedQueryFile();
+ hdfsApi.copy(savedQueryFile, jobQueryFilePath);
+ job.setQueryFile(jobQueryFilePath);
+
+ } else {
+
+ throw new BadRequestFormattedException("queryId or forcedContent should be passed!", null);
+
+ }
+
+ } catch (IOException e) {
+ throw new ServiceFormattedException("F040 Error when creating file " + jobQueryFilePath, e);
+ } catch (InterruptedException e) {
+ throw new ServiceFormattedException("F040 Error when creating file " + jobQueryFilePath, e);
+ } catch (HdfsApiException e) {
+ throw new ServiceFormattedException(e);
+ }
+ job.setQueryFile(jobQueryFilePath);
+
+ LOG.debug("Query file for job#" + job.getId() + ": " + jobQueryFilePath);
+ }
+
+
+ private ConnectionConfig getHiveConnectionConfig() {
+ return ConnectionFactory.create(context);
+ }
+
+ private String getRelatedSavedQueryFile() {
+ SavedQuery savedQuery;
+ try {
+ savedQuery = savedQueryResourceManager.read(job.getQueryId());
+ } catch (ItemNotFound itemNotFound) {
+ throw new BadRequestFormattedException("queryId not found!", itemNotFound);
+ }
+ return savedQuery.getQueryFile();
+ }
+
+ @Override
+ public boolean onModification(Object object) {
+ setModified(true);
+ return true;
+ }
+
+ @Override
+ public boolean isModified() {
+ return modified;
+ }
+
+ public void setModified(boolean modified) {
+ this.modified = modified;
+ }
+
+ @Override
+ public void clearModified() {
+ setModified(false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobImpl.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobImpl.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobImpl.java
new file mode 100644
index 0000000..b71e2f7
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobImpl.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.viewJobs;
+
+import org.apache.commons.beanutils.PropertyUtils;
+
+import java.beans.Transient;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+/**
+ * Bean to represent saved query
+ */
+public class JobImpl implements Job {
+ private String title = null;
+ private String queryFile = null;
+ private String statusDir = null;
+ private Long dateSubmitted = 0L;
+ private Long duration = 0L;
+ private String forcedContent = null;
+ private String dataBase = null;
+ private String queryId = null;
+
+ private String status = JOB_STATE_UNKNOWN;
+ private String statusMessage = null;
+ private String sqlState = null;
+
+ private String applicationId;
+ private String dagId;
+ private String dagName;
+
+ private String sessionTag;
+ private String referrer;
+ private String globalSettings;
+
+ private String id = null;
+ private String owner = null;
+
+ private String logFile;
+ private String confFile;
+ private String errorFile;
+
+ private String guid = null;
+
+ public JobImpl() {}
+ public JobImpl(Map<String, Object> stringObjectMap) throws InvocationTargetException, IllegalAccessException {
+ for (Map.Entry<String, Object> entry : stringObjectMap.entrySet()) {
+ try {
+ PropertyUtils.setProperty(this, entry.getKey(), entry.getValue());
+ } catch (NoSuchMethodException e) {
+ //do nothing, skip
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Job)) return false;
+
+ JobImpl job = (JobImpl) o;
+
+ if (id != null ? !id.equals(job.id) : job.id != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return id != null ? id.hashCode() : 0;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getOwner() {
+ return owner;
+ }
+
+ @Override
+ public void setOwner(String owner) {
+ this.owner = owner;
+ }
+
+ @Override
+ public String getTitle() {
+ return title;
+ }
+
+ @Override
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ @Override
+ public String getQueryFile() {
+ return queryFile;
+ }
+
+ @Override
+ public void setQueryFile(String queryFile) {
+ this.queryFile = queryFile;
+ }
+
+ @Override
+ public Long getDateSubmitted() {
+ return dateSubmitted;
+ }
+
+ @Override
+ public void setDateSubmitted(Long dateSubmitted) {
+ this.dateSubmitted = dateSubmitted;
+ }
+
+ @Override
+ public Long getDuration() {
+ return duration;
+ }
+
+ @Override
+ public void setDuration(Long duration) {
+ this.duration = duration;
+ }
+
+ @Override
+ public String getStatus() {
+ return status;
+ }
+
+ @Override
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ @Override
+ @Transient
+ public String getForcedContent() {
+ return forcedContent;
+ }
+
+ @Override
+ @Transient
+ public void setForcedContent(String forcedContent) {
+ this.forcedContent = forcedContent;
+ }
+
+ @Override
+ public String getQueryId() {
+ return queryId;
+ }
+
+ @Override
+ public void setQueryId(String queryId) {
+ this.queryId = queryId;
+ }
+
+ @Override
+ public String getStatusDir() {
+ return statusDir;
+ }
+
+ @Override
+ public void setStatusDir(String statusDir) {
+ this.statusDir = statusDir;
+ }
+
+ @Override
+ public String getDataBase() {
+ return dataBase;
+ }
+
+ @Override
+ public void setDataBase(String dataBase) {
+ this.dataBase = dataBase;
+ }
+
+ @Override
+ public String getLogFile() {
+ return logFile;
+ }
+
+ @Override
+ public void setLogFile(String logFile) {
+ this.logFile = logFile;
+ }
+
+ @Override
+ public String getConfFile() {
+ return confFile;
+ }
+
+ @Override
+ public void setConfFile(String confFile) {
+ this.confFile = confFile;
+ }
+
+ @Override
+ public String getApplicationId() {
+ return applicationId;
+ }
+
+ @Override
+ public void setApplicationId(String applicationId) {
+ this.applicationId = applicationId;
+ }
+
+ @Override
+ public String getDagName() {
+ return dagName;
+ }
+
+ @Override
+ public void setDagName(String dagName) {
+ this.dagName = dagName;
+ }
+
+ @Override
+ public String getDagId() {
+ return dagId;
+ }
+
+ @Override
+ public void setDagId(String dagId) {
+ this.dagId = dagId;
+ }
+
+ @Override
+ public String getSessionTag() {
+ return sessionTag;
+ }
+
+ @Override
+ public void setSessionTag(String sessionTag) {
+ this.sessionTag = sessionTag;
+ }
+
+ @Override
+ @Transient
+ public String getStatusMessage() {
+ return statusMessage;
+ }
+
+ @Override
+ @Transient
+ public void setStatusMessage(String statusMessage) {
+ this.statusMessage = statusMessage;
+ }
+
+ @Override
+ public String getSqlState() {
+ return sqlState;
+ }
+
+ @Override
+ public void setSqlState(String sqlState) {
+ this.sqlState = sqlState;
+ }
+
+ @Override
+ public String getReferrer() {
+ return referrer;
+ }
+
+ @Override
+ public void setReferrer(String referrer) {
+ this.referrer = referrer;
+ }
+
+ @Override
+ public String getGlobalSettings() {
+ return globalSettings;
+ }
+
+ @Override
+ public void setGlobalSettings(String globalSettings) {
+ this.globalSettings = globalSettings;
+ }
+
+ @Override
+ public String getGuid() {
+ return guid;
+ }
+
+ @Override
+ public void setGuid(String guid) {
+ this.guid = guid;
+ }
+
+ @Override
+ public String getErrorFile() {
+ return errorFile;
+ }
+
+ @Override
+ public void setErrorFile(String errorFile) {
+ this.errorFile = errorFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobResourceManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobResourceManager.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobResourceManager.java
new file mode 100644
index 0000000..f91363f
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobResourceManager.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.jobs.viewJobs;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.persistence.utils.FilteringStrategy;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.resources.PersonalCRUDResourceManager;
+import org.apache.ambari.view.hive2.utils.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Object that provides CRUD operations for job objects
+ */
+public class JobResourceManager extends PersonalCRUDResourceManager<Job> {
+ private final static Logger LOG =
+ LoggerFactory.getLogger(JobResourceManager.class);
+
+ private IJobControllerFactory jobControllerFactory;
+
+ /**
+ * Constructor
+ * @param context View Context instance
+ */
+ public JobResourceManager(SharedObjectsFactory sharedObjectsFactory, ViewContext context) {
+ super(JobImpl.class, sharedObjectsFactory, context);
+ jobControllerFactory = sharedObjectsFactory.getJobControllerFactory();
+ }
+
+ @Override
+ public Job create(Job object) {
+ super.create(object);
+ JobController jobController = jobControllerFactory.createControllerForJob(object);
+
+ try {
+
+ jobController.afterCreation();
+ saveIfModified(jobController);
+
+ } catch (ServiceFormattedException e) {
+ cleanupAfterErrorAndThrowAgain(object, e);
+ }
+
+ return object;
+ }
+
+ public void saveIfModified(JobController jobController) {
+ if (jobController.isModified()) {
+ save(jobController.getJobPOJO());
+ jobController.clearModified();
+ }
+ }
+
+
+ @Override
+ public Job read(Object id) throws ItemNotFound {
+ Job job = super.read(id);
+ JobController jobController = jobControllerFactory.createControllerForJob(job);
+ jobController.update();
+ saveIfModified(jobController);
+ return job;
+ }
+
+ @Override
+ public List<Job> readAll(FilteringStrategy filteringStrategy) {
+ return super.readAll(filteringStrategy);
+ }
+
+ @Override
+ public void delete(Object resourceId) throws ItemNotFound {
+ super.delete(resourceId);
+ }
+
+ public JobController readController(Object id) throws ItemNotFound {
+ Job job = read(id);
+ return jobControllerFactory.createControllerForJob(job);
+ }
+
+ /*public Cursor getJobResultsCursor(ExecuteJob job) {
+ try {
+ JobController jobController = jobControllerFactory.createControllerForJob(job);
+ return jobController.getResults();
+ } catch (ItemNotFound itemNotFound) {
+ throw new NotFoundFormattedException("ExecuteJob results are expired", null);
+ }
+ }*/
+
+ //TODO: New implementation
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceItem.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceItem.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceItem.java
new file mode 100644
index 0000000..ed984e4
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceItem.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.resources;
+
+import org.apache.ambari.view.hive2.persistence.utils.PersonalResource;
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+/**
+ * Bean to represent file resource
+ */
+public class FileResourceItem implements Serializable, PersonalResource {
+ private String name;
+ private String path;
+
+ private String id;
+ private String owner;
+
+ public FileResourceItem() {}
+ public FileResourceItem(Map<String, Object> stringObjectMap) throws InvocationTargetException, IllegalAccessException {
+ BeanUtils.populate(this, stringObjectMap);
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getOwner() {
+ return owner;
+ }
+
+ @Override
+ public void setOwner(String owner) {
+ this.owner = owner;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceResourceManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceResourceManager.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceResourceManager.java
new file mode 100644
index 0000000..a4ea1f6
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceResourceManager.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.resources;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive2.persistence.IStorageFactory;
+import org.apache.ambari.view.hive2.persistence.utils.FilteringStrategy;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.resources.PersonalCRUDResourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Object that provides CRUD operations for resource objects
+ */
+public class FileResourceResourceManager extends PersonalCRUDResourceManager<FileResourceItem> {
+ private final static Logger LOG =
+ LoggerFactory.getLogger(FileResourceResourceManager.class);
+
+ /**
+ * Constructor
+ * @param context View Context instance
+ */
+ public FileResourceResourceManager(IStorageFactory storageFactory, ViewContext context) {
+ super(FileResourceItem.class, storageFactory, context);
+ }
+
+ @Override
+ public FileResourceItem create(FileResourceItem object) {
+ return super.create(object);
+ }
+
+ @Override
+ public FileResourceItem read(Object id) throws ItemNotFound {
+ return super.read(id);
+ }
+
+ @Override
+ public void delete(Object resourceId) throws ItemNotFound {
+ super.delete(resourceId);
+ }
+
+ @Override
+ public List<FileResourceItem> readAll(FilteringStrategy filteringStrategy) {
+ return super.readAll(filteringStrategy);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceResourceProvider.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceResourceProvider.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceResourceProvider.java
new file mode 100644
index 0000000..9e7c00a
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceResourceProvider.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.resources;
+
+import org.apache.ambari.view.*;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.persistence.utils.OnlyOwnersFilteringStrategy;
+import org.apache.ambari.view.hive2.utils.SharedObjectsFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Resource provider for resource
+ */
+public class FileResourceResourceProvider implements ResourceProvider<FileResourceItem> {
+ @Inject
+ ViewContext context;
+
+ protected FileResourceResourceManager resourceManager = null;
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(FileResourceResourceProvider.class);
+
+ protected synchronized FileResourceResourceManager getResourceManager() {
+ if (resourceManager == null) {
+ resourceManager = new FileResourceResourceManager(new SharedObjectsFactory(context), context);
+ }
+ return resourceManager;
+ }
+
+ @Override
+ public FileResourceItem getResource(String resourceId, Set<String> properties) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ try {
+ return getResourceManager().read(resourceId);
+ } catch (ItemNotFound itemNotFound) {
+ throw new NoSuchResourceException(resourceId);
+ }
+ }
+
+ @Override
+ public Set<FileResourceItem> getResources(ReadRequest readRequest) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ if (context == null) {
+ return new HashSet<FileResourceItem>();
+ }
+ return new HashSet<FileResourceItem>(getResourceManager().readAll(
+ new OnlyOwnersFilteringStrategy(this.context.getUsername())));
+ }
+
+ @Override
+ public void createResource(String s, Map<String, Object> stringObjectMap) throws SystemException, ResourceAlreadyExistsException, NoSuchResourceException, UnsupportedPropertyException {
+ FileResourceItem item = null;
+ try {
+ item = new FileResourceItem(stringObjectMap);
+ } catch (InvocationTargetException e) {
+ throw new SystemException("error on creating resource", e);
+ } catch (IllegalAccessException e) {
+ throw new SystemException("error on creating resource", e);
+ }
+ getResourceManager().create(item);
+ }
+
+ @Override
+ public boolean updateResource(String resourceId, Map<String, Object> stringObjectMap) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ FileResourceItem item = null;
+ try {
+ item = new FileResourceItem(stringObjectMap);
+ } catch (InvocationTargetException e) {
+ throw new SystemException("error on updating resource", e);
+ } catch (IllegalAccessException e) {
+ throw new SystemException("error on updating resource", e);
+ }
+ try {
+ getResourceManager().update(item, resourceId);
+ } catch (ItemNotFound itemNotFound) {
+ throw new NoSuchResourceException(resourceId);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean deleteResource(String resourceId) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ try {
+ getResourceManager().delete(resourceId);
+ } catch (ItemNotFound itemNotFound) {
+ throw new NoSuchResourceException(resourceId);
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b88db3cc/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceService.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceService.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceService.java
new file mode 100644
index 0000000..2ce74c3
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/resources/FileResourceService.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.resources.resources;
+
+import org.apache.ambari.view.ViewResourceHandler;
+import org.apache.ambari.view.hive2.BaseService;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.persistence.utils.OnlyOwnersFilteringStrategy;
+import org.apache.ambari.view.hive2.utils.NotFoundFormattedException;
+import org.apache.ambari.view.hive2.utils.ServiceFormattedException;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.*;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import java.util.List;
+
+/**
+ * Servlet for Resources
+ * API:
+ * GET /:id
+ * read resource
+ * POST /
+ * create new resource
+ * GET /
+ * get all resource of current user
+ */
+public class FileResourceService extends BaseService {
+ @Inject
+ ViewResourceHandler handler;
+
+ protected FileResourceResourceManager resourceManager = null;
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(FileResourceService.class);
+
+ protected synchronized FileResourceResourceManager getResourceManager() {
+ if (resourceManager == null) {
+ resourceManager = new FileResourceResourceManager(getSharedObjectsFactory(), context);
+ }
+ return resourceManager;
+ }
+
+ /**
+ * Get single item
+ */
+ @GET
+ @Path("{id}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getOne(@PathParam("id") String id) {
+ try {
+ FileResourceItem fileResourceItem = getResourceManager().read(id);
+ JSONObject object = new JSONObject();
+ object.put("fileResource", fileResourceItem);
+ return Response.ok(object).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (ItemNotFound itemNotFound) {
+ throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Delete single item
+ */
+ @DELETE
+ @Path("{id}")
+ public Response delete(@PathParam("id") String id) {
+ try {
+ getResourceManager().delete(id);
+ return Response.status(204).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (ItemNotFound itemNotFound) {
+ throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Get all resources
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getList() {
+ try {
+ LOG.debug("Getting all resources");
+ List items = getResourceManager().readAll(
+ new OnlyOwnersFilteringStrategy(this.context.getUsername())); //TODO: move strategy to PersonalCRUDRM
+
+ JSONObject object = new JSONObject();
+ object.put("fileResources", items);
+ return Response.ok(object).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Update item
+ */
+ @PUT
+ @Path("{id}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response update(ResourceRequest request,
+ @PathParam("id") String id) {
+ try {
+ getResourceManager().update(request.fileResource, id);
+ return Response.status(204).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (ItemNotFound itemNotFound) {
+ throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Create resource
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response create(ResourceRequest request, @Context HttpServletResponse response,
+ @Context UriInfo ui) {
+ try {
+ getResourceManager().create(request.fileResource);
+
+ FileResourceItem item = null;
+
+ item = getResourceManager().read(request.fileResource.getId());
+
+ response.setHeader("Location",
+ String.format("%s/%s", ui.getAbsolutePath().toString(), request.fileResource.getId()));
+
+ JSONObject object = new JSONObject();
+ object.put("fileResource", item);
+ return Response.ok(object).status(201).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (ItemNotFound itemNotFound) {
+ throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Wrapper object for json mapping
+ */
+ public static class ResourceRequest {
+ public FileResourceItem fileResource;
+ }
+}