You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 08:21:28 UTC

[09/50] incubator-apex-core git commit: APEXCORE-283 #comment added storage agent interface and stram client changes to retrieve application attributes

APEXCORE-283 #comment added storage agent interface and stram client
changes to retrieve application attributes


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/02c43eea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/02c43eea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/02c43eea

Branch: refs/heads/master
Commit: 02c43eea28d27fd71fe3f551dfaceb0c3c931db8
Parents: 0d5bfa5
Author: Ashish Tadose <as...@gmail..com>
Authored: Fri Dec 18 01:23:46 2015 +0530
Committer: Ashish Tadose <as...@gmail..com>
Committed: Fri Dec 18 02:25:12 2015 +0530

----------------------------------------------------------------------
 .../java/com/datatorrent/api/StorageAgent.java  | 18 ++++++++++
 .../java/com/datatorrent/stram/StramClient.java | 38 ++++++++++++++++----
 2 files changed, 50 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/02c43eea/api/src/main/java/com/datatorrent/api/StorageAgent.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/StorageAgent.java b/api/src/main/java/com/datatorrent/api/StorageAgent.java
index e155ed3..b5dcf39 100644
--- a/api/src/main/java/com/datatorrent/api/StorageAgent.java
+++ b/api/src/main/java/com/datatorrent/api/StorageAgent.java
@@ -20,6 +20,8 @@ package com.datatorrent.api;
 
 import java.io.IOException;
 
+import com.datatorrent.api.Attribute.AttributeMap;
+
 /**
  * Interface to define writing/reading checkpoint state of any operator.
  *
@@ -77,4 +79,20 @@ public interface StorageAgent
    */
   public long[] getWindowIds(int operatorId) throws IOException;
 
+  /**
+   * Interface to pass application attributes to storage agent
+   * 
+   *
+   */
+  public interface ApplicationAwareStorageAgent extends StorageAgent
+  {
+   
+    /**
+     * Passes attributes of application to storage agent
+     * 
+     * @param map attributes of application
+     */
+    public void setApplicationAttributes(AttributeMap map);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/02c43eea/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 9a570e0..046a56c 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -24,10 +24,12 @@ import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,14 +40,29 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -54,7 +71,11 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.DTLoggerFactory;
 
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.StorageAgent;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.BasicContainerOptConfigurator;
 import com.datatorrent.stram.client.StramClientUtils;
@@ -455,6 +476,11 @@ public class StramClient
       }
 
       dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, appPath.toString());
+      StorageAgent agent = dag.getAttributes().get(OperatorContext.STORAGE_AGENT);
+      if (agent != null && agent instanceof StorageAgent.ApplicationAwareStorageAgent) {
+        ((StorageAgent.ApplicationAwareStorageAgent)agent).setApplicationAttributes(dag.getAttributes());
+      }
+      
       if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) { /* which would be the most likely case */
         Path checkpointPath = new Path(appPath, LogicalPlan.SUBDIR_CHECKPOINTS);
         // use conf client side to pickup any proxy settings from dt-site.xml