You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 08:21:28 UTC
[09/50] incubator-apex-core git commit: APEXCORE-283 #comment added
storage agent interface and stram client changes to retrieve application
attributes
APEXCORE-283 #comment added storage agent interface and stram client
changes to retrieve application attributes
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/02c43eea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/02c43eea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/02c43eea
Branch: refs/heads/master
Commit: 02c43eea28d27fd71fe3f551dfaceb0c3c931db8
Parents: 0d5bfa5
Author: Ashish Tadose <as...@gmail..com>
Authored: Fri Dec 18 01:23:46 2015 +0530
Committer: Ashish Tadose <as...@gmail..com>
Committed: Fri Dec 18 02:25:12 2015 +0530
----------------------------------------------------------------------
.../java/com/datatorrent/api/StorageAgent.java | 18 ++++++++++
.../java/com/datatorrent/stram/StramClient.java | 38 ++++++++++++++++----
2 files changed, 50 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/02c43eea/api/src/main/java/com/datatorrent/api/StorageAgent.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/StorageAgent.java b/api/src/main/java/com/datatorrent/api/StorageAgent.java
index e155ed3..b5dcf39 100644
--- a/api/src/main/java/com/datatorrent/api/StorageAgent.java
+++ b/api/src/main/java/com/datatorrent/api/StorageAgent.java
@@ -20,6 +20,8 @@ package com.datatorrent.api;
import java.io.IOException;
+import com.datatorrent.api.Attribute.AttributeMap;
+
/**
* Interface to define writing/reading checkpoint state of any operator.
*
@@ -77,4 +79,20 @@ public interface StorageAgent
*/
public long[] getWindowIds(int operatorId) throws IOException;
+ /**
+ * Interface to pass application attributes to storage agent
+ *
+ *
+ */
+ public interface ApplicationAwareStorageAgent extends StorageAgent
+ {
+
+ /**
+ * Passes attributes of application to storage agent
+ *
+ * @param map attributes of application
+ */
+ public void setApplicationAttributes(AttributeMap map);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/02c43eea/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 9a570e0..046a56c 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -24,10 +24,12 @@ import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,14 +40,29 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -54,7 +71,11 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.DTLoggerFactory;
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.StorageAgent;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BasicContainerOptConfigurator;
import com.datatorrent.stram.client.StramClientUtils;
@@ -455,6 +476,11 @@ public class StramClient
}
dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, appPath.toString());
+ StorageAgent agent = dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
+ if (agent != null && agent instanceof StorageAgent.ApplicationAwareStorageAgent) {
+ ((StorageAgent.ApplicationAwareStorageAgent)agent).setApplicationAttributes(dag.getAttributes());
+ }
+
if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) { /* which would be the most likely case */
Path checkpointPath = new Path(appPath, LogicalPlan.SUBDIR_CHECKPOINTS);
// use conf client side to pickup any proxy settings from dt-site.xml