You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2014/05/07 21:38:58 UTC
[4/4] git commit: AMBARI-5704. Pig View Cleanup. (mahadev)
AMBARI-5704. Pig View Cleanup. (mahadev)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c64261e2
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c64261e2
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c64261e2
Branch: refs/heads/branch-1.6.0
Commit: c64261e26961f1d290821c0098e08c8b69f51606
Parents: 40e5707
Author: Mahadev Konar <ma...@apache.org>
Authored: Wed May 7 12:38:42 2014 -0700
Committer: Mahadev Konar <ma...@apache.org>
Committed: Wed May 7 12:38:42 2014 -0700
----------------------------------------------------------------------
.../ambari/view/pig/PigServiceRouter.java | 29 +-
.../persistence/InstanceKeyValueStorage.java | 130 ++--
.../view/pig/persistence/KeyValueStorage.java | 232 +++---
.../pig/persistence/LocalKeyValueStorage.java | 52 +-
.../persistence/PersistentConfiguration.java | 31 +-
.../ambari/view/pig/persistence/Storage.java | 51 +-
.../utils/ContextConfigurationAdapter.java | 445 +++++------
.../persistence/utils/FilteringStrategy.java | 10 +-
.../view/pig/persistence/utils/Indexed.java | 16 +-
.../pig/persistence/utils/ItemNotFound.java | 3 +
.../utils/OnlyOwnersFilteringStrategy.java | 18 +-
.../view/pig/persistence/utils/Owned.java | 16 +-
.../view/pig/persistence/utils/StorageUtil.java | 57 +-
.../view/pig/resources/CRUDResourceManager.java | 118 +--
.../resources/PersonalCRUDResourceManager.java | 89 ++-
.../resources/SharedCRUDResourceManager.java | 31 +-
.../view/pig/resources/files/FileResource.java | 53 +-
.../view/pig/resources/files/FileService.java | 161 ++--
.../pig/resources/jobs/JobResourceManager.java | 422 ++++++-----
.../pig/resources/jobs/JobResourceProvider.java | 113 +--
.../view/pig/resources/jobs/JobService.java | 359 ++++-----
.../view/pig/resources/jobs/models/PigJob.java | 408 +++++-----
.../pig/resources/jobs/utils/JobPolling.java | 175 ++---
.../scripts/ScriptResourceManager.java | 121 +--
.../scripts/ScriptResourceProvider.java | 117 +--
.../pig/resources/scripts/ScriptService.java | 199 ++---
.../pig/resources/scripts/models/PigScript.java | 146 ++--
.../pig/resources/udf/UDFResourceManager.java | 17 +-
.../pig/resources/udf/UDFResourceProvider.java | 117 +--
.../view/pig/resources/udf/UDFService.java | 199 ++---
.../view/pig/resources/udf/models/UDF.java | 74 +-
.../ambari/view/pig/services/BaseService.java | 158 ++--
.../ambari/view/pig/services/HelpService.java | 60 +-
.../view/pig/templeton/client/Request.java | 376 +++++-----
.../view/pig/templeton/client/TempletonApi.java | 274 ++++---
.../pig/templeton/client/TempletonRequest.java | 95 ++-
.../ambari/view/pig/utils/FilePaginator.java | 113 +--
.../apache/ambari/view/pig/utils/HdfsApi.java | 344 +++++----
.../org/apache/ambari/view/pig/BasePigTest.java | 90 +--
.../org/apache/ambari/view/pig/HDFSTest.java | 58 +-
.../apache/ambari/view/pig/test/FileTest.java | 328 ++++-----
.../apache/ambari/view/pig/test/HelpTest.java | 58 +-
.../apache/ambari/view/pig/test/JobTest.java | 738 +++++++++----------
.../view/pig/test/ScriptTestHDFSUnmanaged.java | 112 +--
.../view/pig/test/ScriptTestUnmanaged.java | 60 +-
.../apache/ambari/view/pig/test/UDFTest.java | 154 ++--
46 files changed, 3766 insertions(+), 3261 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/PigServiceRouter.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/PigServiceRouter.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/PigServiceRouter.java
index e1098e7..14f3b4a 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/PigServiceRouter.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/PigServiceRouter.java
@@ -29,20 +29,27 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.Path;
+/**
+ * Pig service
+ */
public class PigServiceRouter {
- @Inject
- ViewContext context;
+ @Inject
+ ViewContext context;
- @Inject
- protected ViewResourceHandler handler;
+ @Inject
+ protected ViewResourceHandler handler;
- protected final static Logger LOG =
- LoggerFactory.getLogger(PigServiceRouter.class);
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(PigServiceRouter.class);
- private Storage storage = null;
+ private Storage storage = null;
- @Path("/help")
- public HelpService help(){
- return new HelpService(context, handler);
- }
+ /**
+ * Help service
+ * @return help service
+ */
+ @Path("/help")
+ public HelpService help(){
+ return new HelpService(context, handler);
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/InstanceKeyValueStorage.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/InstanceKeyValueStorage.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/InstanceKeyValueStorage.java
index 101dcb9..a5ccc35 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/InstanceKeyValueStorage.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/InstanceKeyValueStorage.java
@@ -31,77 +31,85 @@ import org.slf4j.LoggerFactory;
* Path to file should be in 'dataworker.storagePath' parameter
*/
public class InstanceKeyValueStorage extends KeyValueStorage {
- private final static Logger LOG =
- LoggerFactory.getLogger(InstanceKeyValueStorage.class);
+ private final static Logger LOG =
+ LoggerFactory.getLogger(InstanceKeyValueStorage.class);
- private ContextConfigurationAdapter config = null;
- private int VALUE_LENGTH_LIMIT = 254;
+ private ContextConfigurationAdapter config = null;
+ private int VALUE_LENGTH_LIMIT = 254;
- public InstanceKeyValueStorage(ViewContext context) {
- super(context);
- }
+ /**
+ * Constructor.
+ * @param context View Context instance
+ */
+ public InstanceKeyValueStorage(ViewContext context) {
+ super(context);
+ }
- @Override
- protected synchronized Configuration getConfig() {
- if (config == null) {
- config = new ContextConfigurationAdapter(context);
- }
- return config;
+ /**
+ * Returns config instance, adapter to Persistence API
+ * @return config instance
+ */
+ @Override
+ protected synchronized Configuration getConfig() {
+ if (config == null) {
+ config = new ContextConfigurationAdapter(context);
}
+ return config;
+ }
- /**
- * Value is limited to 256 symbols, this code splits value into chunks and saves them as <key>#<chunk_id>
- * @param modelPropName key
- * @param json value
- */
- protected void write(String modelPropName, String json) {
- int saved = 0;
- int page = 1;
- while (saved < json.length()) {
- int end = Math.min(saved + VALUE_LENGTH_LIMIT, json.length());
- String substring = json.substring(saved, end);
- getConfig().setProperty(modelPropName + "#" + page, substring);
- saved += VALUE_LENGTH_LIMIT;
- page += 1;
- LOG.debug("Chunk saved: " + modelPropName + "#" + page + "=" + substring);
- }
- getConfig().setProperty(modelPropName, page - 1);
- LOG.debug("Write finished: " + modelPropName + " pages:" + (page - 1));
+ /**
+ * Value is limited to 256 symbols, this code splits value into chunks and saves them as <key>#<chunk_id>
+ * @param modelPropName key
+ * @param json value
+ */
+ protected void write(String modelPropName, String json) {
+ int saved = 0;
+ int page = 1;
+ while (saved < json.length()) {
+ int end = Math.min(saved + VALUE_LENGTH_LIMIT, json.length());
+ String substring = json.substring(saved, end);
+ getConfig().setProperty(modelPropName + "#" + page, substring);
+ saved += VALUE_LENGTH_LIMIT;
+ page += 1;
+ LOG.debug("Chunk saved: " + modelPropName + "#" + page + "=" + substring);
}
+ getConfig().setProperty(modelPropName, page - 1);
+ LOG.debug("Write finished: " + modelPropName + " pages:" + (page - 1));
+ }
- /**
- * Read chunked value (keys format <key>#<chunk_id>)
- * @param modelPropName key
- * @return value
- */
- protected String read(String modelPropName) {
- StringBuilder result = new StringBuilder();
- int pages = getConfig().getInt(modelPropName);
- LOG.debug("Read started: " + modelPropName + " pages:" + pages);
-
- for(int page = 1; page <= pages; page++) {
- String substring = getConfig().getString(modelPropName + "#" + page);
- LOG.debug("Chunk read: " + modelPropName + "#" + page + "=" + substring);
- if (substring != null) {
- result.append(substring);
- }
- }
+ /**
+ * Read chunked value (keys format <key>#<chunk_id>)
+ * @param modelPropName key
+ * @return value
+ */
+ protected String read(String modelPropName) {
+ StringBuilder result = new StringBuilder();
+ int pages = getConfig().getInt(modelPropName);
+ LOG.debug("Read started: " + modelPropName + " pages:" + pages);
- return result.toString();
+ for(int page = 1; page <= pages; page++) {
+ String substring = getConfig().getString(modelPropName + "#" + page);
+ LOG.debug("Chunk read: " + modelPropName + "#" + page + "=" + substring);
+ if (substring != null) {
+ result.append(substring);
+ }
}
- /**
- * Remove chunked value (keys format <key>#<chunk_id>)
- * @param modelPropName key
- */
- protected void clear(String modelPropName) {
- int pages = getConfig().getInt(modelPropName);
- LOG.debug("Clean started: " + modelPropName + " pages:" + pages);
+ return result.toString();
+ }
+
+ /**
+ * Remove chunked value (keys format <key>#<chunk_id>)
+ * @param modelPropName key
+ */
+ protected void clear(String modelPropName) {
+ int pages = getConfig().getInt(modelPropName);
+ LOG.debug("Clean started: " + modelPropName + " pages:" + pages);
- for(int page = 1; page <= pages; page++) {
- getConfig().clearProperty(modelPropName + "#" + page);
- LOG.debug("Chunk clean: " + modelPropName + "#" + page);
- }
- getConfig().clearProperty(modelPropName);
+ for(int page = 1; page <= pages; page++) {
+ getConfig().clearProperty(modelPropName + "#" + page);
+ LOG.debug("Chunk clean: " + modelPropName + "#" + page);
}
+ getConfig().clearProperty(modelPropName);
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/KeyValueStorage.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/KeyValueStorage.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/KeyValueStorage.java
index db18680..1f5fe7f 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/KeyValueStorage.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/KeyValueStorage.java
@@ -32,123 +32,131 @@ import java.util.ArrayList;
import java.util.List;
/**
- * Engine storing objects to key-value storage
+ * Engine for storing objects to key-value storage
*/
public abstract class KeyValueStorage implements Storage {
- private final static Logger LOG =
- LoggerFactory.getLogger(KeyValueStorage.class);
- protected final Gson gson = new Gson();
- protected ViewContext context;
-
- public KeyValueStorage(ViewContext context) {
- this.context = context;
+ private final static Logger LOG =
+ LoggerFactory.getLogger(KeyValueStorage.class);
+ protected final Gson gson = new Gson();
+ protected ViewContext context;
+
+ /**
+ * Constructor
+ * @param context View Context instance
+ */
+ public KeyValueStorage(ViewContext context) {
+ this.context = context;
+ }
+
+ /**
+ * Returns config instance, adapter to Persistence API
+ * @return config instance
+ */
+ protected abstract Configuration getConfig();
+
+ @Override
+ public synchronized void store(Indexed obj) {
+ String modelIndexingPropName = getIndexPropertyName(obj.getClass());
+
+ if (obj.getId() == null) {
+ int lastIndex = getConfig().getInt(modelIndexingPropName, 0);
+ lastIndex ++;
+ getConfig().setProperty(modelIndexingPropName, lastIndex);
+ obj.setId(Integer.toString(lastIndex));
}
- protected abstract Configuration getConfig();
-
- @Override
- public synchronized void store(Indexed obj) {
- String modelIndexingPropName = getIndexPropertyName(obj.getClass());
-
- if (obj.getId() == null) {
- int lastIndex = getConfig().getInt(modelIndexingPropName, 0);
- lastIndex ++;
- getConfig().setProperty(modelIndexingPropName, lastIndex);
- obj.setId(Integer.toString(lastIndex));
- }
-
- String modelPropName = getItemPropertyName(obj.getClass(), Integer.parseInt(obj.getId()));
- String json = serialize(obj);
- write(modelPropName, json);
+ String modelPropName = getItemPropertyName(obj.getClass(), Integer.parseInt(obj.getId()));
+ String json = serialize(obj);
+ write(modelPropName, json);
+ }
+
+ @Override
+ public <T extends Indexed> T load(Class<T> model, int id) throws ItemNotFound {
+ String modelPropName = getItemPropertyName(model, id);
+ LOG.debug(String.format("Loading %s", modelPropName));
+ if (getConfig().containsKey(modelPropName)) {
+ String json = read(modelPropName);
+ LOG.debug(String.format("json: %s", json));
+ return deserialize(model, json);
+ } else {
+ throw new ItemNotFound();
}
-
- @Override
- public <T extends Indexed> T load(Class<T> model, int id) throws ItemNotFound {
- String modelPropName = getItemPropertyName(model, id);
- LOG.debug(String.format("Loading %s", modelPropName));
- if (getConfig().containsKey(modelPropName)) {
- String json = read(modelPropName);
- LOG.debug(String.format("json: %s", json));
- return deserialize(model, json);
- } else {
- throw new ItemNotFound();
+ }
+
+ /**
+ * Write json to storage
+ * @param modelPropName key
+ * @param json value
+ */
+ protected void write(String modelPropName, String json) {
+ getConfig().setProperty(modelPropName, json);
+ }
+
+ /**
+ * Read json from storage
+ * @param modelPropName key
+ * @return value
+ */
+ protected String read(String modelPropName) {
+ return getConfig().getString(modelPropName);
+ }
+
+ /**
+ * Remove line from storage
+ * @param modelPropName key
+ */
+ protected void clear(String modelPropName) {
+ getConfig().clearProperty(modelPropName);
+ }
+
+ protected String serialize(Indexed obj) {
+ return gson.toJson(obj);
+ }
+
+ protected <T extends Indexed> T deserialize(Class<T> model, String json) {
+ return gson.fromJson(json, model);
+ }
+
+ @Override
+ public synchronized <T extends Indexed> List<T> loadAll(Class<T> model, FilteringStrategy filter) {
+ ArrayList<T> list = new ArrayList<T>();
+ String modelIndexingPropName = getIndexPropertyName(model);
+ LOG.debug(String.format("Loading all %s-s", model.getName()));
+ int lastIndex = getConfig().getInt(modelIndexingPropName, 0);
+ for(int i=1; i<=lastIndex; i++) {
+ try {
+ T item = load(model, i);
+ if ((filter == null) || filter.isConform(item)) {
+ list.add(item);
}
+ } catch (ItemNotFound ignored) {
+ }
}
-
- /**
- * Write json to storage
- * @param modelPropName key
- * @param json value
- */
- protected void write(String modelPropName, String json) {
- getConfig().setProperty(modelPropName, json);
- }
-
- /**
- * Read json from storage
- * @param modelPropName key
- * @return value
- */
- protected String read(String modelPropName) {
- return getConfig().getString(modelPropName);
- }
-
- /**
- * Remove line from storage
- * @param modelPropName key
- */
- protected void clear(String modelPropName) {
- getConfig().clearProperty(modelPropName);
- }
-
- protected String serialize(Indexed obj) {
- return gson.toJson(obj);
- }
-
- protected <T extends Indexed> T deserialize(Class<T> model, String json) {
- return gson.fromJson(json, model);
- }
-
- @Override
- public synchronized <T extends Indexed> List<T> loadAll(Class<T> model, FilteringStrategy filter) {
- ArrayList<T> list = new ArrayList<T>();
- String modelIndexingPropName = getIndexPropertyName(model);
- LOG.debug(String.format("Loading all %s-s", model.getName()));
- int lastIndex = getConfig().getInt(modelIndexingPropName, 0);
- for(int i=1; i<=lastIndex; i++) {
- try {
- T item = load(model, i);
- if ((filter == null) || filter.is_conform(item)) {
- list.add(item);
- }
- } catch (ItemNotFound ignored) {
- }
- }
- return list;
- }
-
- @Override
- public synchronized <T extends Indexed> List<T> loadAll(Class<T> model) {
- return loadAll(model, new OnlyOwnersFilteringStrategy(this.context.getUsername()));
- }
-
- @Override
- public synchronized void delete(Class model, int id) {
- LOG.debug(String.format("Deleting %s:%d", model.getName(), id));
- String modelPropName = getItemPropertyName(model, id);
- clear(modelPropName);
- }
-
- @Override
- public boolean exists(Class model, int id) {
- return getConfig().containsKey(getItemPropertyName(model, id));
- }
-
- private String getIndexPropertyName(Class model) {
- return String.format("%s:index", model.getName());
- }
-
- private String getItemPropertyName(Class model, int id) {
- return String.format("%s.%d", model.getName(), id);
- }
+ return list;
+ }
+
+ @Override
+ public synchronized <T extends Indexed> List<T> loadAll(Class<T> model) {
+ return loadAll(model, new OnlyOwnersFilteringStrategy(this.context.getUsername()));
+ }
+
+ @Override
+ public synchronized void delete(Class model, int id) {
+ LOG.debug(String.format("Deleting %s:%d", model.getName(), id));
+ String modelPropName = getItemPropertyName(model, id);
+ clear(modelPropName);
+ }
+
+ @Override
+ public boolean exists(Class model, int id) {
+ return getConfig().containsKey(getItemPropertyName(model, id));
+ }
+
+ private String getIndexPropertyName(Class model) {
+ return String.format("%s:index", model.getName());
+ }
+
+ private String getItemPropertyName(Class model, int id) {
+ return String.format("%s.%d", model.getName(), id);
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/LocalKeyValueStorage.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/LocalKeyValueStorage.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/LocalKeyValueStorage.java
index 54dcb7f..8a1952b 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/LocalKeyValueStorage.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/LocalKeyValueStorage.java
@@ -31,31 +31,39 @@ import javax.xml.ws.WebServiceException;
* Path to file should be in 'dataworker.storagePath' parameter
*/
public class LocalKeyValueStorage extends KeyValueStorage {
- private final static Logger LOG =
- LoggerFactory.getLogger(LocalKeyValueStorage.class);
+ private final static Logger LOG =
+ LoggerFactory.getLogger(LocalKeyValueStorage.class);
- private PersistentConfiguration config = null;
+ private PersistentConfiguration config = null;
- public LocalKeyValueStorage(ViewContext context) {
- super(context);
- }
+ /**
+ * Constructor
+ * @param context View Context instance
+ */
+ public LocalKeyValueStorage(ViewContext context) {
+ super(context);
+ }
- @Override
- protected synchronized PersistentConfiguration getConfig() {
- if (config == null) {
- String fileName = context.getProperties().get("dataworker.storagePath");
- if (fileName == null) {
- String msg = "dataworker.storagePath is not configured!";
- LOG.error(msg);
- throw new WebServiceException(msg);
- }
- try {
- config = new PersistentConfiguration(fileName);
- } catch (ConfigurationException e) {
- e.printStackTrace();
- }
- }
- return config;
+ /**
+ * Returns config instance
+ * @return config instance
+ */
+ @Override
+ protected synchronized PersistentConfiguration getConfig() {
+ if (config == null) {
+ String fileName = context.getProperties().get("dataworker.storagePath");
+ if (fileName == null) {
+ String msg = "dataworker.storagePath is not configured!";
+ LOG.error(msg);
+ throw new WebServiceException(msg);
+ }
+ try {
+ config = new PersistentConfiguration(fileName);
+ } catch (ConfigurationException e) {
+ e.printStackTrace();
+ }
}
+ return config;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/PersistentConfiguration.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/PersistentConfiguration.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/PersistentConfiguration.java
index 7e191f2..c3748c7 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/PersistentConfiguration.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/PersistentConfiguration.java
@@ -24,19 +24,28 @@ import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
import java.io.File;
+/**
+ * Configuration enables all necessary options for PropertiesConfiguration:
+ * auto-save, auto-reloading, no delimiter parsing and other
+ */
public class PersistentConfiguration extends PropertiesConfiguration {
- public PersistentConfiguration(String fileName) throws ConfigurationException {
- super();
+ /**
+ * Constructor
+ * @param fileName path to data file
+ * @throws ConfigurationException
+ */
+ public PersistentConfiguration(String fileName) throws ConfigurationException {
+ super();
- File config = new File(fileName);
- setFile(config);
- this.setAutoSave(true);
- this.setReloadingStrategy(new FileChangedReloadingStrategy());
- this.setDelimiterParsingDisabled(true);
- this.setListDelimiter((char) 0);
+ File config = new File(fileName);
+ setFile(config);
+ this.setAutoSave(true);
+ this.setReloadingStrategy(new FileChangedReloadingStrategy());
+ this.setDelimiterParsingDisabled(true);
+ this.setListDelimiter((char) 0);
- if (config.exists()) {
- this.load();
- }
+ if (config.exists()) {
+ this.load();
}
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/Storage.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/Storage.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/Storage.java
index 1507918..da9cfc3 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/Storage.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/Storage.java
@@ -24,16 +24,55 @@ import org.apache.ambari.view.pig.persistence.utils.ItemNotFound;
import java.util.List;
+/**
+ * Object storage interface
+ */
public interface Storage {
- void store(Indexed obj);
+ /**
+ * Persist object to DB. It should be Indexed
+ * @param obj object to save
+ */
+ void store(Indexed obj);
- <T extends Indexed> T load(Class<T> model, int id) throws ItemNotFound;
+ /**
+ * Load object
+ * @param model bean class
+ * @param id identifier
+ * @param <T> bean class
+ * @return bean instance
+ * @throws ItemNotFound thrown if item with id was not found in DB
+ */
+ <T extends Indexed> T load(Class<T> model, int id) throws ItemNotFound;
- <T extends Indexed> List<T> loadAll(Class<T> model, FilteringStrategy filter);
+ /**
+ * Load all objects of given bean class
+ * @param model bean class
+ * @param filter filtering strategy (return only those objects that conform condition)
+ * @param <T> bean class
+ * @return list of filtered objects
+ */
+ <T extends Indexed> List<T> loadAll(Class<T> model, FilteringStrategy filter);
- <T extends Indexed> List<T> loadAll(Class<T> model);
+ /**
+ * Load all objects of given bean class
+ * @param model bean class
+ * @param <T> bean class
+ * @return list of all objects
+ */
+ <T extends Indexed> List<T> loadAll(Class<T> model);
- void delete(Class model, int id);
+ /**
+ * Delete object
+ * @param model bean class
+ * @param id identifier
+ */
+ void delete(Class model, int id);
- boolean exists(Class model, int id);
+ /**
+ * Check is object exists
+ * @param model bean class
+ * @param id identifier
+ * @return true if exists
+ */
+ boolean exists(Class model, int id);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/ContextConfigurationAdapter.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/ContextConfigurationAdapter.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/ContextConfigurationAdapter.java
index 829905d..d49a921 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/ContextConfigurationAdapter.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/ContextConfigurationAdapter.java
@@ -27,224 +27,231 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+/**
+ * Persistence API to Apache Configuration adapter
+ */
public class ContextConfigurationAdapter implements Configuration {
- private ViewContext context;
-
- public ContextConfigurationAdapter(ViewContext context) {
- this.context = context;
- }
-
- @Override
- public Configuration subset(String prefix) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isEmpty() {
- return context.getInstanceData().isEmpty();
- }
-
- @Override
- public boolean containsKey(String s) {
- return context.getInstanceData().containsKey(s);
- }
-
- @Override
- public void addProperty(String s, Object o) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setProperty(String s, Object o) {
- context.putInstanceData(s, o.toString());
- }
-
- @Override
- public void clearProperty(String key) {
- context.removeInstanceData(key);
- }
-
- @Override
- public void clear() {
- for (String key : context.getInstanceData().keySet())
- context.removeInstanceData(key);
- }
-
- @Override
- public Object getProperty(String key) {
- return context.getInstanceData(key);
- }
-
- @Override
- public Iterator getKeys(String s) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator getKeys() {
- return context.getInstanceData().keySet().iterator();
- }
-
- @Override
- public Properties getProperties(String s) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean getBoolean(String s) {
- return getBoolean(s, null);
- }
-
- @Override
- public boolean getBoolean(String s, boolean b) {
- return getBoolean(s, (Boolean)b);
- }
-
- @Override
- public Boolean getBoolean(String s, Boolean aBoolean) {
- String data = context.getInstanceData(s);
- return (data != null)?Boolean.parseBoolean(data):aBoolean;
- }
-
- @Override
- public byte getByte(String s) {
- return getByte(s, null);
- }
-
- @Override
- public byte getByte(String s, byte b) {
- return getByte(s, (Byte)b);
- }
-
- @Override
- public Byte getByte(String s, Byte aByte) {
- String data = context.getInstanceData(s);
- return (data != null)?Byte.parseByte(data):aByte;
- }
-
- @Override
- public double getDouble(String s) {
- return getDouble(s, null);
- }
-
- @Override
- public double getDouble(String s, double v) {
- return getDouble(s, (Double)v);
- }
-
- @Override
- public Double getDouble(String s, Double aDouble) {
- String data = context.getInstanceData(s);
- return (data != null)?Double.parseDouble(data):aDouble;
- }
-
- @Override
- public float getFloat(String s) {
- return getFloat(s, null);
- }
-
- @Override
- public float getFloat(String s, float v) {
- return getFloat(s, (Float)v);
- }
-
- @Override
- public Float getFloat(String s, Float aFloat) {
- String data = context.getInstanceData(s);
- return (data != null)?Float.parseFloat(data):aFloat;
- }
-
- @Override
- public int getInt(String s) {
- return getInteger(s, null);
- }
-
- @Override
- public int getInt(String s, int i) {
- return getInteger(s, i);
- }
-
- @Override
- public Integer getInteger(String s, Integer integer) {
- String data = context.getInstanceData(s);
- return (data != null)?Integer.parseInt(data):integer;
- }
-
- @Override
- public long getLong(String s) {
- return getLong(s, null);
- }
-
- @Override
- public long getLong(String s, long l) {
- return getLong(s, (Long)l);
- }
-
- @Override
- public Long getLong(String s, Long aLong) {
- String data = context.getInstanceData(s);
- return (data != null)?Long.parseLong(data):aLong;
- }
-
- @Override
- public short getShort(String s) {
- return getShort(s, null);
- }
-
- @Override
- public short getShort(String s, short i) {
- return getShort(s, (Short)i);
- }
-
- @Override
- public Short getShort(String s, Short aShort) {
- String data = context.getInstanceData(s);
- return (data != null)?Short.parseShort(data):aShort;
- }
-
- @Override
- public BigDecimal getBigDecimal(String s) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public BigDecimal getBigDecimal(String s, BigDecimal bigDecimal) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public BigInteger getBigInteger(String s) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public BigInteger getBigInteger(String s, BigInteger bigInteger) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getString(String s) {
- return context.getInstanceData(s);
- }
-
- @Override
- public String getString(String s, String s2) {
- String data = getString(s);
- return (data != null)?data:s2;
- }
-
- @Override
- public String[] getStringArray(String s) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List getList(String s) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List getList(String s, List list) {
- throw new UnsupportedOperationException();
- }
+ private ViewContext context;
+
+ /**
+ * Constructor of adapter
+ * @param context View Context
+ */
+ public ContextConfigurationAdapter(ViewContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public Configuration subset(String prefix) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return context.getInstanceData().isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(String s) {
+ return context.getInstanceData().containsKey(s);
+ }
+
+ @Override
+ public void addProperty(String s, Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setProperty(String s, Object o) {
+ context.putInstanceData(s, o.toString());
+ }
+
+ @Override
+ public void clearProperty(String key) {
+ context.removeInstanceData(key);
+ }
+
+ @Override
+ public void clear() {
+ for (String key : context.getInstanceData().keySet())
+ context.removeInstanceData(key);
+ }
+
+ @Override
+ public Object getProperty(String key) {
+ return context.getInstanceData(key);
+ }
+
+ @Override
+ public Iterator getKeys(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator getKeys() {
+ return context.getInstanceData().keySet().iterator();
+ }
+
+ @Override
+ public Properties getProperties(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean getBoolean(String s) {
+ return getBoolean(s, null);
+ }
+
+ @Override
+ public boolean getBoolean(String s, boolean b) {
+ return getBoolean(s, (Boolean)b);
+ }
+
+ @Override
+ public Boolean getBoolean(String s, Boolean aBoolean) {
+ String data = context.getInstanceData(s);
+ return (data != null)?Boolean.parseBoolean(data):aBoolean;
+ }
+
+ @Override
+ public byte getByte(String s) {
+ return getByte(s, null);
+ }
+
+ @Override
+ public byte getByte(String s, byte b) {
+ return getByte(s, (Byte)b);
+ }
+
+ @Override
+ public Byte getByte(String s, Byte aByte) {
+ String data = context.getInstanceData(s);
+ return (data != null)?Byte.parseByte(data):aByte;
+ }
+
+ @Override
+ public double getDouble(String s) {
+ return getDouble(s, null);
+ }
+
+ @Override
+ public double getDouble(String s, double v) {
+ return getDouble(s, (Double)v);
+ }
+
+ @Override
+ public Double getDouble(String s, Double aDouble) {
+ String data = context.getInstanceData(s);
+ return (data != null)?Double.parseDouble(data):aDouble;
+ }
+
+ @Override
+ public float getFloat(String s) {
+ return getFloat(s, null);
+ }
+
+ @Override
+ public float getFloat(String s, float v) {
+ return getFloat(s, (Float)v);
+ }
+
+ @Override
+ public Float getFloat(String s, Float aFloat) {
+ String data = context.getInstanceData(s);
+ return (data != null)?Float.parseFloat(data):aFloat;
+ }
+
+ @Override
+ public int getInt(String s) {
+ return getInteger(s, null);
+ }
+
+ @Override
+ public int getInt(String s, int i) {
+ return getInteger(s, i);
+ }
+
+ @Override
+ public Integer getInteger(String s, Integer integer) {
+ String data = context.getInstanceData(s);
+ return (data != null)?Integer.parseInt(data):integer;
+ }
+
+ @Override
+ public long getLong(String s) {
+ return getLong(s, null);
+ }
+
+ @Override
+ public long getLong(String s, long l) {
+ return getLong(s, (Long)l);
+ }
+
+ @Override
+ public Long getLong(String s, Long aLong) {
+ String data = context.getInstanceData(s);
+ return (data != null)?Long.parseLong(data):aLong;
+ }
+
+ @Override
+ public short getShort(String s) {
+ return getShort(s, null);
+ }
+
+ @Override
+ public short getShort(String s, short i) {
+ return getShort(s, (Short)i);
+ }
+
+ @Override
+ public Short getShort(String s, Short aShort) {
+ String data = context.getInstanceData(s);
+ return (data != null)?Short.parseShort(data):aShort;
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(String s, BigDecimal bigDecimal) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BigInteger getBigInteger(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BigInteger getBigInteger(String s, BigInteger bigInteger) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getString(String s) {
+ return context.getInstanceData(s);
+ }
+
+ @Override
+ public String getString(String s, String s2) {
+ String data = getString(s);
+ return (data != null)?data:s2;
+ }
+
+ @Override
+ public String[] getStringArray(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List getList(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List getList(String s, List list) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/FilteringStrategy.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/FilteringStrategy.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/FilteringStrategy.java
index 75a0953..acc247d 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/FilteringStrategy.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/FilteringStrategy.java
@@ -18,6 +18,14 @@
package org.apache.ambari.view.pig.persistence.utils;
+/**
+ * Filtering strategy for stored objects
+ */
public interface FilteringStrategy {
- boolean is_conform(Indexed item);
+ /**
+ * Check whether item conforms chosen filter or not
+ * @param item item to check
+ * @return true if item conforms this filter
+ */
+ boolean isConform(Indexed item);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/Indexed.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/Indexed.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/Indexed.java
index cbe2016..110260d 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/Indexed.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/Indexed.java
@@ -18,7 +18,19 @@
package org.apache.ambari.view.pig.persistence.utils;
+/**
+ * Interface to represent item with identifier
+ */
public interface Indexed {
- String getId();
- void setId(String id);
+ /**
+ * Get the ID
+ * @return ID
+ */
+ String getId();
+
+ /**
+ * Set ID
+ * @param id ID
+ */
+ void setId(String id);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/ItemNotFound.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/ItemNotFound.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/ItemNotFound.java
index df56036..00f4d35 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/ItemNotFound.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/ItemNotFound.java
@@ -18,5 +18,8 @@
package org.apache.ambari.view.pig.persistence.utils;
+/**
+ * Thrown when item was not found in DB
+ */
public class ItemNotFound extends Exception {
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/OnlyOwnersFilteringStrategy.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/OnlyOwnersFilteringStrategy.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/OnlyOwnersFilteringStrategy.java
index 7964cf7..46801f1 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/OnlyOwnersFilteringStrategy.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/OnlyOwnersFilteringStrategy.java
@@ -19,15 +19,15 @@
package org.apache.ambari.view.pig.persistence.utils;
public class OnlyOwnersFilteringStrategy implements FilteringStrategy {
- private final String username;
+ private final String username;
- public OnlyOwnersFilteringStrategy(String username) {
- this.username = username;
- }
+ public OnlyOwnersFilteringStrategy(String username) {
+ this.username = username;
+ }
- @Override
- public boolean is_conform(Indexed item) {
- Owned object = (Owned) item;
- return object.getOwner().compareTo(username) == 0;
- }
+ @Override
+ public boolean isConform(Indexed item) {
+ Owned object = (Owned) item;
+ return object.getOwner().compareTo(username) == 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/Owned.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/Owned.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/Owned.java
index 30918a2..352f490 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/Owned.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/Owned.java
@@ -18,7 +18,19 @@
package org.apache.ambari.view.pig.persistence.utils;
+/**
+ * Interface to represent item with owner
+ */
public interface Owned {
- String getOwner();
- void setOwner(String owner);
+ /**
+ * Get the owner
+ * @return owner
+ */
+ String getOwner();
+
+ /**
+ * Set owner
+ * @param owner owner
+ */
+ void setOwner(String owner);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/StorageUtil.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/StorageUtil.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/StorageUtil.java
index 0c7b25b..245ad54 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/StorageUtil.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/persistence/utils/StorageUtil.java
@@ -25,29 +25,46 @@ import org.apache.ambari.view.pig.persistence.Storage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Storage factory, creates storage of Local or Persistence API type.
+ * Type depends on context configuration: if "dataworker.storagePath" is set,
+ * storage of Local type will be created. Otherwise, Persistence API will be used.
+ *
+ * Storage is singleton.
+ */
public class StorageUtil {
- private static Storage storageInstance = null;
+ private static Storage storageInstance = null;
- protected final static Logger LOG =
- LoggerFactory.getLogger(StorageUtil.class);
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(StorageUtil.class);
- public synchronized static Storage getStorage(ViewContext context) {
- if (storageInstance == null) {
- String fileName = context.getProperties().get("dataworker.storagePath");
- if (fileName != null) {
- LOG.debug("Using local storage in " + fileName + " to store data");
- // If specifed, use LocalKeyValueStorage - key-value file based storage
- storageInstance = new LocalKeyValueStorage(context);
- } else {
- LOG.debug("Using Persistence API to store data");
- // If not specifed, use ambari-views Persistence API
- storageInstance = new InstanceKeyValueStorage(context);
- }
- }
- return storageInstance;
+ /**
+ * Get storage instance. If one is not created, creates instance.
+ * @param context View Context instance
+ * @return storage instance
+ */
+ public synchronized static Storage getStorage(ViewContext context) {
+ if (storageInstance == null) {
+ String fileName = context.getProperties().get("dataworker.storagePath");
+ if (fileName != null) {
+ LOG.debug("Using local storage in " + fileName + " to store data");
+ // If specifed, use LocalKeyValueStorage - key-value file based storage
+ storageInstance = new LocalKeyValueStorage(context);
+ } else {
+ LOG.debug("Using Persistence API to store data");
+ // If not specifed, use ambari-views Persistence API
+ storageInstance = new InstanceKeyValueStorage(context);
+ }
}
+ return storageInstance;
+ }
- public static void setStorage(Storage storage) {
- storageInstance = storage;
- }
+ /**
+ * Set storage to use across all application.
+ * Used in unit tests.
+ * @param storage storage instance
+ */
+ public static void setStorage(Storage storage) {
+ storageInstance = storage;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/CRUDResourceManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/CRUDResourceManager.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/CRUDResourceManager.java
index 9319d3b..b4a2059 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/CRUDResourceManager.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/CRUDResourceManager.java
@@ -32,60 +32,92 @@ import java.util.List;
* @param <T> Data type with ID
*/
abstract public class CRUDResourceManager<T extends Indexed> {
- private Storage storage = null;
+ private Storage storage = null;
- protected final Class<T> resourceClass;
+ protected final Class<T> resourceClass;
- public CRUDResourceManager(Class<T> responseClass) {
- this.resourceClass = responseClass;
- }
- // CRUD operations
+ /**
+ * Constructor
+ * @param responseClass model class
+ */
+ public CRUDResourceManager(Class<T> responseClass) {
+ this.resourceClass = responseClass;
+ }
+ // CRUD operations
- public T create(T object) {
- object.setId(null);
- return this.save(object);
- }
+ /**
+ * Create operation
+ * @param object object
+ * @return model object
+ */
+ public T create(T object) {
+ object.setId(null);
+ return this.save(object);
+ }
- public T read(String id) throws ItemNotFound {
- T object = null;
- object = getPigStorage().load(this.resourceClass, Integer.parseInt(id));
- if (!checkPermissions(object))
- throw new ItemNotFound();
- return object;
- }
+ /**
+ * Read operation
+ * @param id identifier
+ * @return model object
+ * @throws ItemNotFound
+ */
+ public T read(String id) throws ItemNotFound {
+ T object = null;
+ object = getPigStorage().load(this.resourceClass, Integer.parseInt(id));
+ if (!checkPermissions(object))
+ throw new ItemNotFound();
+ return object;
+ }
- public List<T> readAll(FilteringStrategy filteringStrategy) {
- return getPigStorage().loadAll(this.resourceClass, filteringStrategy);
- }
+ /**
+ * Read all objects
+ * @param filteringStrategy filtering strategy
+ * @return list of filtered objects
+ */
+ public List<T> readAll(FilteringStrategy filteringStrategy) {
+ return getPigStorage().loadAll(this.resourceClass, filteringStrategy);
+ }
- public T update(T newObject, String id) throws ItemNotFound {
- newObject.setId(id);
- this.save(newObject);
- return newObject;
- }
+ /**
+ * Update operation
+ * @param newObject new object
+ * @param id identifier of previous object
+ * @return model object
+ * @throws ItemNotFound
+ */
+ public T update(T newObject, String id) throws ItemNotFound {
+ newObject.setId(id);
+ this.save(newObject);
+ return newObject;
+ }
- public void delete(String resourceId) throws ItemNotFound {
- int id = Integer.parseInt(resourceId);
- if (!getPigStorage().exists(this.resourceClass, id)) {
- throw new ItemNotFound();
- }
- getPigStorage().delete(this.resourceClass, id);
+ /**
+ * Delete operation
+ * @param resourceId object identifier
+ * @throws ItemNotFound
+ */
+ public void delete(String resourceId) throws ItemNotFound {
+ int id = Integer.parseInt(resourceId);
+ if (!getPigStorage().exists(this.resourceClass, id)) {
+ throw new ItemNotFound();
}
+ getPigStorage().delete(this.resourceClass, id);
+ }
- // UTILS
+ // UTILS
- protected T save(T object) {
- getPigStorage().store(object);
- return object;
- }
+ protected T save(T object) {
+ getPigStorage().store(object);
+ return object;
+ }
- protected Storage getPigStorage() {
- if (storage == null) {
- storage = StorageUtil.getStorage(getContext());
- }
- return storage;
+ protected Storage getPigStorage() {
+ if (storage == null) {
+ storage = StorageUtil.getStorage(getContext());
}
+ return storage;
+ }
- protected abstract boolean checkPermissions(T object);
- protected abstract ViewContext getContext();
+ protected abstract boolean checkPermissions(T object);
+ protected abstract ViewContext getContext();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/PersonalCRUDResourceManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/PersonalCRUDResourceManager.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/PersonalCRUDResourceManager.java
index ffccfd9..650d9a2 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/PersonalCRUDResourceManager.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/PersonalCRUDResourceManager.java
@@ -29,53 +29,62 @@ import java.util.concurrent.Callable;
* @param <T> Data type with ID and Owner
*/
public class PersonalCRUDResourceManager<T extends PersonalResource> extends CRUDResourceManager<T> {
- protected ViewContext context;
- protected boolean ignorePermissions = false;
+ protected ViewContext context;
+ protected boolean ignorePermissions = false;
- public PersonalCRUDResourceManager(Class<T> responseClass, ViewContext context) {
- super(responseClass);
- this.context = context;
- }
-
- public T update(T newObject, String id) throws ItemNotFound {
- T object = getPigStorage().load(this.resourceClass, Integer.parseInt(id));
- if (object.getOwner().compareTo(this.context.getUsername()) != 0) {
- throw new ItemNotFound();
- }
+ /**
+ * Constructor
+ * @param responseClass model class
+ * @param context View Context instance
+ */
+ public PersonalCRUDResourceManager(Class<T> responseClass, ViewContext context) {
+ super(responseClass);
+ this.context = context;
+ }
- newObject.setOwner(this.context.getUsername());
- return super.update(newObject, id);
+ @Override
+ public T update(T newObject, String id) throws ItemNotFound {
+ T object = getPigStorage().load(this.resourceClass, Integer.parseInt(id));
+ if (object.getOwner().compareTo(this.context.getUsername()) != 0) {
+ throw new ItemNotFound();
}
- public T save(T object) {
- object.setOwner(this.context.getUsername());
- return super.save(object);
- }
+ newObject.setOwner(this.context.getUsername());
+ return super.update(newObject, id);
+ }
- @Override
- protected boolean checkPermissions(T object) {
- if (ignorePermissions)
- return true;
- return object.getOwner().compareTo(this.context.getUsername()) == 0;
- }
+ @Override
+ public T save(T object) {
+ object.setOwner(this.context.getUsername());
+ return super.save(object);
+ }
- @Override
- protected ViewContext getContext() {
- return context;
- }
+ @Override
+ protected boolean checkPermissions(T object) {
+ if (ignorePermissions)
+ return true;
+ return object.getOwner().compareTo(this.context.getUsername()) == 0;
+ }
- public <T> T ignorePermissions(String fakeUser, Callable<T> actions) throws Exception {
- ignorePermissions = true;
- T result;
- try {
- result = actions.call();
- } finally {
- ignorePermissions = false;
- }
- return result;
- }
+ @Override
+ protected ViewContext getContext() {
+ return context;
+ }
- public <T> T ignorePermissions(Callable<T> actions) throws Exception {
- return ignorePermissions("", actions);
+ /**
+ * Execute action ignoring objects owner
+ * @param actions callable to execute
+ * @return value returned from actions
+ * @throws Exception
+ */
+ public <T> T ignorePermissions(Callable<T> actions) throws Exception {
+ ignorePermissions = true;
+ T result;
+ try {
+ result = actions.call();
+ } finally {
+ ignorePermissions = false;
}
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/SharedCRUDResourceManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/SharedCRUDResourceManager.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/SharedCRUDResourceManager.java
index be89d36..3b7a173 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/SharedCRUDResourceManager.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/SharedCRUDResourceManager.java
@@ -26,20 +26,25 @@ import org.apache.ambari.view.pig.persistence.utils.Indexed;
* @param <T> Data type with ID
*/
public class SharedCRUDResourceManager<T extends Indexed> extends CRUDResourceManager<T> {
- protected ViewContext context;
+ protected ViewContext context;
- public SharedCRUDResourceManager(Class<T> responseClass, ViewContext context) {
- super(responseClass);
- this.context = context;
- }
+ /**
+ * Constructor
+ * @param responseClass model class
+ * @param context View Context instance
+ */
+ public SharedCRUDResourceManager(Class<T> responseClass, ViewContext context) {
+ super(responseClass);
+ this.context = context;
+ }
- @Override
- protected boolean checkPermissions(T object) {
- return true; //everyone has permission
- }
+ @Override
+ protected boolean checkPermissions(T object) {
+ return true; //everyone has permission
+ }
- @Override
- protected ViewContext getContext() {
- return context;
- }
+ @Override
+ protected ViewContext getContext() {
+ return context;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/files/FileResource.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/files/FileResource.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/files/FileResource.java
index 9ae45c7..ef8b2ad 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/files/FileResource.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/files/FileResource.java
@@ -18,10 +18,53 @@
package org.apache.ambari.view.pig.resources.files;
+/**
+ * File bean
+ */
public class FileResource {
- public String filePath;
- public String fileContent;
- public boolean hasNext;
- public long page;
- public long pageCount;
+ private String filePath;
+ private String fileContent;
+ private boolean hasNext;
+ private long page;
+ private long pageCount;
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public String getFileContent() {
+ return fileContent;
+ }
+
+ public void setFileContent(String fileContent) {
+ this.fileContent = fileContent;
+ }
+
+ public boolean isHasNext() {
+ return hasNext;
+ }
+
+ public void setHasNext(boolean hasNext) {
+ this.hasNext = hasNext;
+ }
+
+ public long getPage() {
+ return page;
+ }
+
+ public void setPage(long page) {
+ this.page = page;
+ }
+
+ public long getPageCount() {
+ return pageCount;
+ }
+
+ public void setPageCount(long pageCount) {
+ this.pageCount = pageCount;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/files/FileService.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/files/FileService.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/files/FileService.java
index c36c582..292c1be 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/files/FileService.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/files/FileService.java
@@ -47,95 +47,98 @@ import java.io.IOException;
* update file content
*/
public class FileService extends BaseService {
- @Inject
- ViewResourceHandler handler;
+ @Inject
+ ViewResourceHandler handler;
- protected final static Logger LOG =
- LoggerFactory.getLogger(FileService.class);
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(FileService.class);
- /**
- * Get single item
- */
- @GET
- @Path("{filePath:.*}")
- @Produces(MediaType.APPLICATION_JSON)
- public Response getFile(@PathParam("filePath") String filePath, @QueryParam("page") Long page) throws IOException, InterruptedException {
- LOG.debug("Reading file " + filePath);
- try {
- FilePaginator paginator = new FilePaginator(filePath, context);
+ /**
+ * Get single item
+ */
+ @GET
+ @Path("{filePath:.*}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getFile(@PathParam("filePath") String filePath, @QueryParam("page") Long page) throws IOException, InterruptedException {
+ LOG.debug("Reading file " + filePath);
+ try {
+ FilePaginator paginator = new FilePaginator(filePath, context);
- if (page == null)
- page = 0L;
+ if (page == null)
+ page = 0L;
- FileResource file = new FileResource();
- file.filePath = filePath;
- file.fileContent = paginator.readPage(page);
- file.hasNext = paginator.pageCount() > page + 1;
- file.page = page;
- file.pageCount = paginator.pageCount();
+ FileResource file = new FileResource();
+ file.setFilePath(filePath);
+ file.setFileContent(paginator.readPage(page));
+ file.setHasNext(paginator.pageCount() > page + 1);
+ file.setPage(page);
+ file.setPageCount(paginator.pageCount());
- JSONObject object = new JSONObject();
- object.put("file", file);
- return Response.ok(object).status(200).build();
- } catch (FileNotFoundException e) {
- return notFoundResponse(e.toString());
- } catch (IllegalArgumentException e) {
- return badRequestResponse(e.toString());
- }
+ JSONObject object = new JSONObject();
+ object.put("file", file);
+ return Response.ok(object).status(200).build();
+ } catch (FileNotFoundException e) {
+ return notFoundResponse(e.toString());
+ } catch (IllegalArgumentException e) {
+ return badRequestResponse(e.toString());
}
+ }
- /**
- * Delete single item
- */
- @DELETE
- @Path("{filePath:.*}")
- public Response deleteFile(@PathParam("filePath") String filePath) throws IOException, InterruptedException {
- LOG.debug("Deleting file " + filePath);
- if (getHdfsApi().delete(filePath, false)) {
- return Response.status(204).build();
- }
- return notFoundResponse("FileSystem.delete returned false");
+ /**
+ * Delete single item
+ */
+ @DELETE
+ @Path("{filePath:.*}")
+ public Response deleteFile(@PathParam("filePath") String filePath) throws IOException, InterruptedException {
+ LOG.debug("Deleting file " + filePath);
+ if (getHdfsApi().delete(filePath, false)) {
+ return Response.status(204).build();
}
+ return notFoundResponse("FileSystem.delete returned false");
+ }
- /**
- * Update item
- */
- @PUT
- @Path("{filePath:.*}")
- @Consumes(MediaType.APPLICATION_JSON)
- public Response updateFile(FileResourceRequest request,
- @PathParam("filePath") String filePath) throws IOException, InterruptedException {
- LOG.debug("Rewriting file " + filePath);
- FSDataOutputStream output = getHdfsApi().create(filePath, true);
- output.writeBytes(request.file.fileContent);
- output.close();
- return Response.status(204).build();
- }
+ /**
+ * Update item
+ */
+ @PUT
+ @Path("{filePath:.*}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response updateFile(FileResourceRequest request,
+ @PathParam("filePath") String filePath) throws IOException, InterruptedException {
+ LOG.debug("Rewriting file " + filePath);
+ FSDataOutputStream output = getHdfsApi().create(filePath, true);
+ output.writeBytes(request.file.getFileContent());
+ output.close();
+ return Response.status(204).build();
+ }
- /**
- * Create script
- */
- @POST
- @Consumes(MediaType.APPLICATION_JSON)
- public Response createFile(FileResourceRequest request,
- @Context HttpServletResponse response, @Context UriInfo ui)
- throws IOException, InterruptedException {
- LOG.debug("Creating file " + request.file.filePath);
- try {
- FSDataOutputStream output = getHdfsApi().create(request.file.filePath, false);
- if (request.file.fileContent != null) {
- output.writeBytes(request.file.fileContent);
- }
- output.close();
- } catch (FileAlreadyExistsException e) {
- return badRequestResponse(e.toString());
- }
- response.setHeader("Location",
- String.format("%s/%s", ui.getAbsolutePath().toString(), request.file.filePath));
- return Response.status(204).build();
+ /**
+ * Create script
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response createFile(FileResourceRequest request,
+ @Context HttpServletResponse response, @Context UriInfo ui)
+ throws IOException, InterruptedException {
+ LOG.debug("Creating file " + request.file.getFilePath());
+ try {
+ FSDataOutputStream output = getHdfsApi().create(request.file.getFilePath(), false);
+ if (request.file.getFileContent() != null) {
+ output.writeBytes(request.file.getFileContent());
+ }
+ output.close();
+ } catch (FileAlreadyExistsException e) {
+ return badRequestResponse(e.toString());
}
+ response.setHeader("Location",
+ String.format("%s/%s", ui.getAbsolutePath().toString(), request.file.getFilePath()));
+ return Response.status(204).build();
+ }
- public static class FileResourceRequest {
- public FileResource file;
- }
+ /**
+ * Wrapper object for json mapping
+ */
+ public static class FileResourceRequest {
+ public FileResource file;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c64261e2/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/JobResourceManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/JobResourceManager.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/JobResourceManager.java
index 67a038b..abc2ddd 100644
--- a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/JobResourceManager.java
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/JobResourceManager.java
@@ -44,237 +44,257 @@ import java.util.regex.Pattern;
* CRUD overridden to support
*/
public class JobResourceManager extends PersonalCRUDResourceManager<PigJob> {
- protected TempletonApi api;
+ protected TempletonApi api;
- private final static Logger LOG =
- LoggerFactory.getLogger(JobResourceManager.class);
+ private final static Logger LOG =
+ LoggerFactory.getLogger(JobResourceManager.class);
- public JobResourceManager(ViewContext context) {
- super(PigJob.class, context);
- setupPolling();
- }
+ /**
+ * Constructor
+ * @param context View Context instance
+ */
+ public JobResourceManager(ViewContext context) {
+ super(PigJob.class, context);
+ setupPolling();
+ }
- public TempletonApi getTempletonApi() {
- if (api == null) {
- api = new TempletonApi(context.getProperties().get("dataworker.templeton_url"),
- getTempletonUser(), getTempletonUser(), context);
- }
- return api;
+ /**
+ * Get templeton api business delegate
+ * @return templeton api business delegate
+ */
+ public TempletonApi getTempletonApi() {
+ if (api == null) {
+ api = new TempletonApi(context.getProperties().get("dataworker.templeton_url"),
+ getTempletonUser(), getTempletonUser(), context);
}
+ return api;
+ }
- public void setTempletonApi(TempletonApi api) {
- this.api = api;
- }
+ /**
+ * Set templeton api business delegate
+ * @param api templeton api business delegate
+ */
+ public void setTempletonApi(TempletonApi api) {
+ this.api = api;
+ }
- private void setupPolling() {
- List<PigJob> notCompleted = this.readAll(new FilteringStrategy() {
- @Override
- public boolean is_conform(Indexed item) {
- PigJob job = (PigJob) item;
- return job.isInProgress();
- }
- });
+ private void setupPolling() {
+ List<PigJob> notCompleted = this.readAll(new FilteringStrategy() {
+ @Override
+ public boolean isConform(Indexed item) {
+ PigJob job = (PigJob) item;
+ return job.isInProgress();
+ }
+ });
- for(PigJob job : notCompleted) {
- JobPolling.pollJob(context, job);
- }
+ for(PigJob job : notCompleted) {
+ JobPolling.pollJob(context, job);
}
+ }
- @Override
- public PigJob create(PigJob object) {
- object.setStatus(PigJob.Status.SUBMITTING);
- PigJob job = super.create(object);
- LOG.debug("Submitting job...");
+ @Override
+ public PigJob create(PigJob object) {
+ object.setStatus(PigJob.Status.SUBMITTING);
+ PigJob job = super.create(object);
+ LOG.debug("Submitting job...");
- try {
- submitJob(object);
- } catch (RuntimeException e) {
- object.setStatus(PigJob.Status.SUBMIT_FAILED);
- save(object);
- LOG.debug("Job submit FAILED");
- throw e;
- }
- LOG.debug("Job submit OK");
- object.setStatus(PigJob.Status.SUBMITTED);
- save(object);
- return job;
+ try {
+ submitJob(object);
+ } catch (RuntimeException e) {
+ object.setStatus(PigJob.Status.SUBMIT_FAILED);
+ save(object);
+ LOG.debug("Job submit FAILED");
+ throw e;
}
+ LOG.debug("Job submit OK");
+ object.setStatus(PigJob.Status.SUBMITTED);
+ save(object);
+ return job;
+ }
- public void killJob(PigJob object) throws IOException {
- LOG.debug("Killing job...");
+ /**
+ * Kill Templeton Job
+ * @param object job object
+ * @throws IOException network error
+ */
+ public void killJob(PigJob object) throws IOException {
+ LOG.debug("Killing job...");
- try {
- getTempletonApi().killJob(object.getJobId());
- } catch (IOException e) {
- LOG.debug("Job kill FAILED");
- throw e;
- }
- LOG.debug("Job kill OK");
+ try {
+ getTempletonApi().killJob(object.getJobId());
+ } catch (IOException e) {
+ LOG.debug("Job kill FAILED");
+ throw e;
}
+ LOG.debug("Job kill OK");
+ }
- /**
- * Running job
- * @param job job bean
- */
- private void submitJob(PigJob job) {
- String date = new SimpleDateFormat("dd-MM-yyyy-HH-mm-ss").format(new Date());
- String statusdir = String.format(context.getProperties().get("dataworker.pigJobsPath") +
- "/%s/%s_%s", getTempletonUser(),
- job.getTitle().toLowerCase().replaceAll("[^a-zA-Z0-9 ]+", "").replace(" ", "_"),
- date);
+ /**
+ * Running job
+ * @param job job bean
+ */
+ private void submitJob(PigJob job) {
+ String date = new SimpleDateFormat("dd-MM-yyyy-HH-mm-ss").format(new Date());
+ String statusdir = String.format(context.getProperties().get("dataworker.pigJobsPath") +
+ "/%s/%s_%s", getTempletonUser(),
+ job.getTitle().toLowerCase().replaceAll("[^a-zA-Z0-9 ]+", "").replace(" ", "_"),
+ date);
- String newPigScriptPath = statusdir + "/script.pig";
- String newSourceFilePath = statusdir + "/source.pig";
- String newPythonScriptPath = statusdir + "/udf.py";
- String templetonParamsFilePath = statusdir + "/params";
- try {
- // additional file can be passed to copy into work directory
- if (job.getSourceFileContent() != null && !job.getSourceFileContent().isEmpty()) {
- String sourceFileContent = job.getSourceFileContent();
- job.setSourceFileContent(null); // we should not store content in DB
- save(job);
+ String newPigScriptPath = statusdir + "/script.pig";
+ String newSourceFilePath = statusdir + "/source.pig";
+ String newPythonScriptPath = statusdir + "/udf.py";
+ String templetonParamsFilePath = statusdir + "/params";
+ try {
+ // additional file can be passed to copy into work directory
+ if (job.getSourceFileContent() != null && !job.getSourceFileContent().isEmpty()) {
+ String sourceFileContent = job.getSourceFileContent();
+ job.setSourceFileContent(null); // we should not store content in DB
+ save(job);
- FSDataOutputStream stream = BaseService.getHdfsApi(context).create(newSourceFilePath, true);
- stream.writeBytes(sourceFileContent);
- stream.close();
- } else {
- if (job.getSourceFile() != null && !job.getSourceFile().isEmpty()) {
- // otherwise, just copy original file
- if (!BaseService.getHdfsApi(context).copy(job.getSourceFile(), newSourceFilePath)) {
- throw new WebServiceException("Can't copy source file from " + job.getSourceFile() +
- " to " + newPigScriptPath);
- }
- }
- }
- } catch (IOException e) {
- throw new WebServiceException("Can't create/copy source file: " + e.toString(), e);
- } catch (InterruptedException e) {
- throw new WebServiceException("Can't create/copy source file: " + e.toString(), e);
+ FSDataOutputStream stream = BaseService.getHdfsApi(context).create(newSourceFilePath, true);
+ stream.writeBytes(sourceFileContent);
+ stream.close();
+ } else {
+ if (job.getSourceFile() != null && !job.getSourceFile().isEmpty()) {
+ // otherwise, just copy original file
+ if (!BaseService.getHdfsApi(context).copy(job.getSourceFile(), newSourceFilePath)) {
+ throw new WebServiceException("Can't copy source file from " + job.getSourceFile() +
+ " to " + newPigScriptPath);
+ }
}
+ }
+ } catch (IOException e) {
+ throw new WebServiceException("Can't create/copy source file: " + e.toString(), e);
+ } catch (InterruptedException e) {
+ throw new WebServiceException("Can't create/copy source file: " + e.toString(), e);
+ }
- try {
- // content can be passed from front-end with substituted arguments
- if (job.getForcedContent() != null && !job.getForcedContent().isEmpty()) {
- String forcedContent = job.getForcedContent();
- // variable for sourceFile can be passed from front-ent
- forcedContent = forcedContent.replace("${sourceFile}",
- context.getProperties().get("dataworker.defaultFs") + newSourceFilePath);
- job.setForcedContent(null); // we should not store content in DB
- save(job);
-
- FSDataOutputStream stream = BaseService.getHdfsApi(context).create(newPigScriptPath, true);
- stream.writeBytes(forcedContent);
- stream.close();
- } else {
- // otherwise, just copy original file
- if (!BaseService.getHdfsApi(context).copy(job.getPigScript(), newPigScriptPath)) {
- throw new WebServiceException("Can't copy pig script file from " + job.getPigScript() +
- " to " + newPigScriptPath);
- }
- }
- } catch (IOException e) {
- throw new WebServiceException("Can't create/copy pig script file: " + e.toString(), e);
- } catch (InterruptedException e) {
- throw new WebServiceException("Can't create/copy pig script file: " + e.toString(), e);
- }
+ try {
+ // content can be passed from front-end with substituted arguments
+ if (job.getForcedContent() != null && !job.getForcedContent().isEmpty()) {
+ String forcedContent = job.getForcedContent();
+ // variable for sourceFile can be passed from front-ent
+ forcedContent = forcedContent.replace("${sourceFile}",
+ context.getProperties().get("dataworker.defaultFs") + newSourceFilePath);
+ job.setForcedContent(null); // we should not store content in DB
+ save(job);
- if (job.getPythonScript() != null && !job.getPythonScript().isEmpty()) {
- try {
- if (!BaseService.getHdfsApi(context).copy(job.getPythonScript(), newPythonScriptPath)) {
- throw new WebServiceException("Can't copy python udf script file from " + job.getPythonScript() +
- " to " + newPythonScriptPath);
- }
- } catch (IOException e) {
- throw new WebServiceException("Can't create/copy python udf file: " + e.toString(), e);
- } catch (InterruptedException e) {
- throw new WebServiceException("Can't create/copy python udf file: " + e.toString(), e);
- }
+ FSDataOutputStream stream = BaseService.getHdfsApi(context).create(newPigScriptPath, true);
+ stream.writeBytes(forcedContent);
+ stream.close();
+ } else {
+ // otherwise, just copy original file
+ if (!BaseService.getHdfsApi(context).copy(job.getPigScript(), newPigScriptPath)) {
+ throw new WebServiceException("Can't copy pig script file from " + job.getPigScript() +
+ " to " + newPigScriptPath);
}
+ }
+ } catch (IOException e) {
+ throw new WebServiceException("Can't create/copy pig script file: " + e.toString(), e);
+ } catch (InterruptedException e) {
+ throw new WebServiceException("Can't create/copy pig script file: " + e.toString(), e);
+ }
- try {
- FSDataOutputStream stream = BaseService.getHdfsApi(context).create(templetonParamsFilePath, true);
- if (job.getTempletonArguments() != null) {
- stream.writeBytes(job.getTempletonArguments());
- }
- stream.close();
- } catch (IOException e) {
- throw new WebServiceException("Can't create params file: " + e.toString(), e);
- } catch (InterruptedException e) {
- throw new WebServiceException("Can't create params file: " + e.toString(), e);
+ if (job.getPythonScript() != null && !job.getPythonScript().isEmpty()) {
+ try {
+ if (!BaseService.getHdfsApi(context).copy(job.getPythonScript(), newPythonScriptPath)) {
+ throw new WebServiceException("Can't copy python udf script file from " + job.getPythonScript() +
+ " to " + newPythonScriptPath);
}
- job.setPigScript(newPigScriptPath);
+ } catch (IOException e) {
+ throw new WebServiceException("Can't create/copy python udf file: " + e.toString(), e);
+ } catch (InterruptedException e) {
+ throw new WebServiceException("Can't create/copy python udf file: " + e.toString(), e);
+ }
+ }
- job.setStatusDir(statusdir);
- job.setDateStarted(System.currentTimeMillis() / 1000L);
+ try {
+ FSDataOutputStream stream = BaseService.getHdfsApi(context).create(templetonParamsFilePath, true);
+ if (job.getTempletonArguments() != null) {
+ stream.writeBytes(job.getTempletonArguments());
+ }
+ stream.close();
+ } catch (IOException e) {
+ throw new WebServiceException("Can't create params file: " + e.toString(), e);
+ } catch (InterruptedException e) {
+ throw new WebServiceException("Can't create params file: " + e.toString(), e);
+ }
+ job.setPigScript(newPigScriptPath);
- TempletonApi.JobData data = null;
- try {
- data = getTempletonApi().runPigQuery(new File(job.getPigScript()), statusdir, job.getTempletonArguments());
- } catch (IOException templetonBadResponse) {
- String msg = String.format("Templeton bad response: %s", templetonBadResponse.toString());
- LOG.debug(msg);
- throw new WebServiceException(msg, templetonBadResponse);
- }
- job.setJobId(data.id);
+ job.setStatusDir(statusdir);
+ job.setDateStarted(System.currentTimeMillis() / 1000L);
- JobPolling.pollJob(context, job);
+ TempletonApi.JobData data = null;
+ try {
+ data = getTempletonApi().runPigQuery(new File(job.getPigScript()), statusdir, job.getTempletonArguments());
+ } catch (IOException templetonBadResponse) {
+ String msg = String.format("Templeton bad response: %s", templetonBadResponse.toString());
+ LOG.debug(msg);
+ throw new WebServiceException(msg, templetonBadResponse);
}
+ job.setJobId(data.id);
- public void retrieveJobStatus(PigJob job) {
- TempletonApi.JobInfo info = null;
- try {
- info = getTempletonApi().checkJob(job.getJobId());
- } catch (IOException e) {
- LOG.warn(String.format("IO Exception: %s", e));
- return;
- }
+ JobPolling.pollJob(context, job);
+ }
- if (info.status != null && (info.status.containsKey("runState"))) {
- //TODO: retrieve from RM
- int runState = ((Double) info.status.get("runState")).intValue();
- switch (runState) {
- case PigJob.RUN_STATE_KILLED:
- LOG.debug(String.format("Job KILLED: %s", job.getJobId()));
- job.setStatus(PigJob.Status.KILLED);
- break;
- case PigJob.RUN_STATE_FAILED:
- LOG.debug(String.format("Job FAILED: %s", job.getJobId()));
- job.setStatus(PigJob.Status.FAILED);
- break;
- case PigJob.RUN_STATE_PREP:
- case PigJob.RUN_STATE_RUNNING:
- job.setStatus(PigJob.Status.RUNNING);
- break;
- case PigJob.RUN_STATE_SUCCEEDED:
- LOG.debug(String.format("Job COMPLETED: %s", job.getJobId()));
- job.setStatus(PigJob.Status.COMPLETED);
- break;
- default:
- LOG.debug(String.format("Job in unknown state: %s", job.getJobId()));
- job.setStatus(PigJob.Status.UNKNOWN);
- break;
- }
- }
- Pattern pattern = Pattern.compile("\\d+");
- Matcher matcher = null;
- if (info.percentComplete != null) {
- matcher = pattern.matcher(info.percentComplete);
- }
- if (matcher != null && matcher.find()) {
- job.setPercentComplete(Integer.valueOf(matcher.group()));
- } else {
- job.setPercentComplete(null);
- }
- save(job);
+ /**
+ * Get job status
+ * @param job job object
+ */
+ public void retrieveJobStatus(PigJob job) {
+ TempletonApi.JobInfo info = null;
+ try {
+ info = getTempletonApi().checkJob(job.getJobId());
+ } catch (IOException e) {
+ LOG.warn(String.format("IO Exception: %s", e));
+ return;
}
- /**
- * Extension point to use different usernames in templeton
- * requests instead of logged in user
- * @return username in templeton
- */
- private String getTempletonUser() {
- return context.getProperties().get("dataworker.templeton_user");
-// return context.getTempletonUser();
+ if (info.status != null && (info.status.containsKey("runState"))) {
+ //TODO: retrieve from RM
+ int runState = ((Double) info.status.get("runState")).intValue();
+ switch (runState) {
+ case PigJob.RUN_STATE_KILLED:
+ LOG.debug(String.format("Job KILLED: %s", job.getJobId()));
+ job.setStatus(PigJob.Status.KILLED);
+ break;
+ case PigJob.RUN_STATE_FAILED:
+ LOG.debug(String.format("Job FAILED: %s", job.getJobId()));
+ job.setStatus(PigJob.Status.FAILED);
+ break;
+ case PigJob.RUN_STATE_PREP:
+ case PigJob.RUN_STATE_RUNNING:
+ job.setStatus(PigJob.Status.RUNNING);
+ break;
+ case PigJob.RUN_STATE_SUCCEEDED:
+ LOG.debug(String.format("Job COMPLETED: %s", job.getJobId()));
+ job.setStatus(PigJob.Status.COMPLETED);
+ break;
+ default:
+ LOG.debug(String.format("Job in unknown state: %s", job.getJobId()));
+ job.setStatus(PigJob.Status.UNKNOWN);
+ break;
+ }
+ }
+ Pattern pattern = Pattern.compile("\\d+");
+ Matcher matcher = null;
+ if (info.percentComplete != null) {
+ matcher = pattern.matcher(info.percentComplete);
}
+ if (matcher != null && matcher.find()) {
+ job.setPercentComplete(Integer.valueOf(matcher.group()));
+ } else {
+ job.setPercentComplete(null);
+ }
+ save(job);
+ }
+
+ /**
+ * Extension point to use different usernames in templeton
+ * requests instead of logged in user
+ * @return username in templeton
+ */
+ private String getTempletonUser() {
+ return context.getProperties().get("dataworker.templeton_user");
+ }
}