You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drat.apache.org by GitBox <gi...@apache.org> on 2018/06/11 15:42:46 UTC

[GitHub] chrismattmann closed pull request #131: Changed the map and reduce function to work with XmlRpcWorkflowManger

chrismattmann closed pull request #131: Changed the map and reduce function to work with XmlRpcWorkflowManger
URL: https://github.com/apache/drat/pull/131
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/proteus/src/main/java/backend/FileConstants.java b/proteus/src/main/java/backend/FileConstants.java
index 651060e..3bbd6c2 100644
--- a/proteus/src/main/java/backend/FileConstants.java
+++ b/proteus/src/main/java/backend/FileConstants.java
@@ -19,10 +19,6 @@
 
 import org.apache.oodt.cas.metadata.util.PathUtils;
 
-import java.io.File;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 /**
  * Created by stevenfrancus on 10/13/15.
  */
@@ -41,6 +37,8 @@
   public static final String CLIENT_URL=PathUtils.replaceEnvVariables("[WORKFLOW_URL]");
   public static final String OPSUI_URL=PathUtils.replaceEnvVariables("[OPSUI_URL]");
   
+  public static final String MET_EXT_CONFIG_PATH =buildDratSubdirectoryPath("/deploy/extractors/code/default.cpr.conf");
+  public static final String CRAWLER_CONFIG = buildDratSubdirectoryPath("/deploy/crawler/policy/crawler-config.xml");
   public static final String SOLR_INDEXER_CONFIG = "SOLR_INDEXER_CONFIG";
   
   private static String getDratDirectory() {
diff --git a/proteus/src/main/java/backend/ProcessDratWrapper.java b/proteus/src/main/java/backend/ProcessDratWrapper.java
index 10f1e20..1601370 100644
--- a/proteus/src/main/java/backend/ProcessDratWrapper.java
+++ b/proteus/src/main/java/backend/ProcessDratWrapper.java
@@ -21,16 +21,15 @@
 import org.apache.commons.exec.DefaultExecutor;
 import org.apache.commons.exec.PumpStreamHandler;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.time.DurationFormatUtils;
+import org.apache.oodt.cas.crawl.MetExtractorProductCrawler;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.pcs.util.WorkflowManagerUtils;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
-import org.python.modules.synchronize;
-import org.apache.oodt.cas.crawl.CrawlerLauncher;
 import org.apache.oodt.cas.filemgr.structs.Product;
 import org.apache.oodt.cas.filemgr.structs.ProductPage;
 import org.apache.oodt.cas.filemgr.structs.ProductType;
-import org.apache.oodt.cas.filemgr.system.XmlRpcFileManagerClient;
 import org.apache.oodt.cas.filemgr.tools.DeleteProduct;
 import org.apache.oodt.cas.filemgr.tools.SolrIndexer;
 import org.apache.oodt.cas.metadata.util.PathUtils;
@@ -38,11 +37,14 @@
 import org.apache.oodt.pcs.util.FileManagerUtils;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
+
+import drat.proteus.workflow.rest.DynamicWorkflowRequestWrapper;
+import drat.proteus.workflow.rest.WorkflowRestResource;
+import org.springframework.context.support.FileSystemXmlApplicationContext;
+
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URL;
 import java.nio.charset.Charset;
@@ -53,7 +55,6 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Locale;
 import java.util.logging.Logger;
 
 public class ProcessDratWrapper extends GenericProcess
@@ -72,7 +73,10 @@
   private static final String RESET_CMD = "reset";
   private static final String STATUS_IDLE = "idle";
 
-  private static final String MAPPER_TASK = "urn:drat:RatCodeAudit";
+  private static final String MAPPER_TASK = "RatCodeAudit";
+  
+  private static final String MAPPER_TASK_ID = "urn:drat:MimePartitioner";
+  private static final String REDUCE_TASK_ID = "urn:drat:RatAggregator";
   private static final String[] WIPE_TYPES = { "RatLog", "GenericFile",
       "RatAggregateLog" };
 
@@ -111,7 +115,34 @@ public synchronized void setStatus(String status) {
 
   @Override
   public void crawl() throws Exception {
-    simpleDratExec(CRAWL_CMD, this.path);
+      DratLog crawlLog = new DratLog("CRAWLING");
+      try{
+          setStatus(CRAWL_CMD);
+
+          crawlLog.logInfo("Configuring");
+          String beanRepo = System.getProperty("org.apache.oodt.cas.crawl.bean.repo",
+                  FileConstants.CRAWLER_CONFIG);
+          String crawlerId = "MetExtractorProductCrawler";
+          System.setProperty("DRAT_EXCLUDE","");
+          FileSystemXmlApplicationContext appContext = new FileSystemXmlApplicationContext("file:"+beanRepo);
+
+          MetExtractorProductCrawler crawler = new MetExtractorProductCrawler();
+          crawler.setApplicationContext(appContext);
+          crawler.setId(crawlerId);
+          crawler.setMetExtractor("org.apache.oodt.cas.metadata.extractors.CopyAndRewriteExtractor");
+          crawler.setMetExtractorConfig(FileConstants.MET_EXT_CONFIG_PATH);
+          crawler.setFilemgrUrl(FileConstants.FILEMGR_URL);
+          crawler.setClientTransferer("org.apache.oodt.cas.filemgr.datatransfer.InPlaceDataTransferFactory");
+          crawler.setPreCondIds(Arrays.asList("RegExExcludeComparator"));
+          crawler.setProductPath(this.path);
+          crawlLog.logInfo("STARTING ",null);
+          crawler.crawl();
+          crawlLog.logInfo("COMPLETED",null);
+      }catch (Exception ex) {
+          crawlLog.logSevere("ERROR ",ex.getLocalizedMessage());
+          ex.printStackTrace();
+          throw ex;
+      }
   }
 
   @Override
@@ -120,6 +151,7 @@ public void index() throws IOException, DratWrapperException, InstantiationExcep
   }
   
   private synchronized void solrIndex() throws InstantiationException, SolrServerException, IOException {
+      setStatus(INDEX_CMD);
       DratLog idl = new DratLog("INDEXING");
       idl.logInfo("Starting", null);
       System.setProperty(FileConstants.SOLR_INDEXER_CONFIG,FileConstants.SOLR_INDEXER_CONFIG_PATH);
@@ -128,17 +160,43 @@ private synchronized void solrIndex() throws InstantiationException, SolrServerE
       sIndexer.commit();
       sIndexer.optimize();
       idl.logInfo("Completed",null);
-      
   }
 
   @Override
-  public void map() throws IOException, DratWrapperException {
-    simpleDratExec(MAP_CMD);
+  public void map() {
+    setStatus(MAP_CMD);
+    DratLog mapLog = new DratLog("MAPPING");
+    WorkflowRestResource restResource = new WorkflowRestResource();
+    DynamicWorkflowRequestWrapper requestBody = new DynamicWorkflowRequestWrapper();
+    requestBody.taskIds = new ArrayList<>();
+    requestBody.taskIds.add(MAPPER_TASK_ID);
+    LOG.info("STARTING MAPPING");
+    mapLog.logInfo("STARTING", " (dynamic workflow with task "+MAPPER_TASK_ID);
+    String resp = restResource.performDynamicWorkFlow(requestBody);
+    if(resp.equals("OK")) {
+        mapLog.logInfo("STARTED SUCCESSFULLY, "+MAPPER_TASK_ID+" dynamic workflow");
+    }else {
+        mapLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp);
+    }
   }
 
   @Override
-  public void reduce() throws IOException, DratWrapperException {
-    simpleDratExec(REDUCE_CMD);
+  public void reduce() throws IOException {
+    setStatus(REDUCE_CMD);
+    DratLog mapLog = new DratLog("REDUCING");
+    WorkflowRestResource restResource = new WorkflowRestResource();
+    DynamicWorkflowRequestWrapper requestBody = new DynamicWorkflowRequestWrapper();
+    requestBody.taskIds = new ArrayList<>();
+    requestBody.taskIds.add(REDUCE_TASK_ID);
+    LOG.info("STARTING REDUCING");
+    mapLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID);
+    String resp = (String)restResource.performDynamicWorkFlow(requestBody);
+    if(resp.equals("OK")) {
+        mapLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow");
+    }else {
+        mapLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp);
+        throw new IOException(resp);
+    }
   }
 
   @Override
@@ -205,11 +263,13 @@ public void go() throws Exception {
     // don't run reduce until all maps are done
     while (mapsStillRunning()) {
       Thread.sleep(DRAT_PROCESS_WAIT_DURATION);
+      LOG.info("MAP STILL RUNNING");
     }
     // you're not done until the final log is generated.
     while (!hasAggregateRatLog()) {
       try {
         reduce();
+        LOG.info("REDUCE STILL RUNNING");
       } catch (IOException e) {
         LOG.warning("Fired reduce off before mappers were done. Sleeping: ["
             + String.valueOf(DRAT_PROCESS_WAIT_DURATION / 1000)
@@ -221,30 +281,6 @@ public void go() throws Exception {
     setStatus(STATUS_IDLE);
   }
 
-  public synchronized void simpleDratExec(String command, String... options)
-      throws IOException, DratWrapperException {
-    setStatus(command);
-    String args[] = { FileConstants.DRAT_PATH, command };
-    String all[] = (String[]) ArrayUtils.addAll(args, options);
-    String cmd = Joiner.on(" ").join(all);
-
-    String output = null;
-    try {
-      output = execToString(cmd);
-    } catch (IOException e) {
-      LOG.warning("Executing DRAT cmd: [" + command + "]: command line: [" + cmd
-          + "] generated non-zero exit status. output is: [" + output
-          + "]: Message: " + e.getLocalizedMessage());
-      throw e;
-    } catch (Exception e) {
-      LOG.warning("Exception executing " + command + ". Output: [" + output
-          + "]: Message: " + e.getLocalizedMessage());
-      throw new IOException(e.getLocalizedMessage());
-    }
-
-    LOG.info(
-        "Command: [" + command + "] completed normally. Output is: " + output);
-  }
 
   private synchronized boolean hasAggregateRatLog() {
     int numLogs = -1;
@@ -257,14 +293,13 @@ private synchronized boolean hasAggregateRatLog() {
   }
 
   private boolean mapsStillRunning() throws Exception {
-    String args[] = { FileConstants.WORKFLOW_PATH, "--url",
-        "http://localhost:9001", "--operation", "--getWorkflowInsts" };
-    String cmd = Joiner.on(" ").join(args);
-    LOG.info("Maps Still Running: Executing: " + cmd);
-    String output = execToString(cmd);
-    LOG.info("Output from maps still running: " + output);
-    List<WorkflowItem> items = parseWorkflows(output);
-    return stillRunning(items);
+    WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL);
+    List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances();
+    for(WorkflowInstance instance : workflowInstances){
+      LOG.info("Running Instances : id: "+instance.getId()
+              +" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName());
+    }
+    return stillRunning(workflowInstances);
   }
 
   @VisibleForTesting
@@ -311,35 +346,33 @@ private boolean mapsStillRunning() throws Exception {
   }
 
   @VisibleForTesting
-  protected boolean stillRunning(List<WorkflowItem> items) {
-    List<WorkflowItem> mapperItems = filterMappers(items);
+  protected boolean stillRunning(List<WorkflowInstance> instances) {
+    List<WorkflowInstance> mapperInstances = filterMappers(instances);
     LOG.info("Checking mappers: inspecting ["
-        + String.valueOf(mapperItems.size()) + "] mappers.");
-    for (WorkflowItem mapperItem : mapperItems) {
-      if (isRunning(mapperItem.getStatus())) {
-        LOG.info("Mapper: [" + mapperItem.getId() + "] still running.");
+            + String.valueOf(mapperInstances.size()) + "] mappers.");
+    for (WorkflowInstance mapperInstance : mapperInstances) {
+      if (isRunning(mapperInstance.getState().getName())) {
+        LOG.info("Mapper: [" + mapperInstance.getId() + "] still running.");
         return true;
       }
     }
-
     return false;
   }
 
   @VisibleForTesting
-  protected List<WorkflowItem> filterMappers(List<WorkflowItem> items) {
-    List<WorkflowItem> mappers = new ArrayList<WorkflowItem>();
-    if (items != null && items.size() > 0) {
-      for (WorkflowItem item : items) {
-        if (item.getCurrentTask().equals(MAPPER_TASK)) {
-          LOG.info("Adding mapper: [" + item.getCurrentTask() + "]");
-          mappers.add(item);
-        } else {
-          LOG.info("Filtering task: [" + item.getCurrentTask() + "]");
-        }
+  protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){
+      List<WorkflowInstance> mappers = new ArrayList<>();
+      if(instances!=null && instances.size()>0){
+          for(WorkflowInstance instance:instances){
+              if(instance.getCurrentTask().equals(MAPPER_TASK)){
+                  LOG.info("Adding mapper: [" + instance.getCurrentTask() + "]");
+                  mappers.add(instance);
+              }else{
+                  LOG.info("Filtering task: [" + instance.getCurrentTask() + "]");
+              }
+          }
       }
-    }
-
-    return mappers;
+      return mappers;
   }
 
   @VisibleForTesting
@@ -448,7 +481,7 @@ private synchronized void wipeSolrCore(String coreName) {
   }
   
   private class DratLog{
-      private static final String MODULE = "DRAT_LOG"; 
+      private static final String MODULE = "DRAT_LOG";
       long startTime =0;
       private long lastActionTime=-1L;
       private long timeDiff  =-1L;
diff --git a/proteus/src/main/java/drat/proteus/WicketApplication.java b/proteus/src/main/java/drat/proteus/WicketApplication.java
index cf46ca2..d84d164 100644
--- a/proteus/src/main/java/drat/proteus/WicketApplication.java
+++ b/proteus/src/main/java/drat/proteus/WicketApplication.java
@@ -27,6 +27,7 @@
 
 import drat.proteus.rest.DratRestResource;
 import drat.proteus.rest.ServicesRestResource;
+import drat.proteus.workflow.rest.WorkflowRestResource;
 
 import org.apache.wicket.markup.html.WebPage;
 import org.apache.wicket.protocol.http.WebApplication;
@@ -73,6 +74,14 @@ public IResource getResource() {
         return resource;
       }
     });
+    
+    mountResource("/workflowservice",new ResourceReference("restReference"){
+        WorkflowRestResource resource = new WorkflowRestResource();
+        @Override
+        public IResource getResource() {
+          return resource;
+        }
+    });
     mountPage("/workflow", DratWorkflow.class);
 
     doImageMounts(
diff --git a/proteus/src/main/java/drat/proteus/rest/DratRestResource.java b/proteus/src/main/java/drat/proteus/rest/DratRestResource.java
index 6b1562c..d6079d3 100644
--- a/proteus/src/main/java/drat/proteus/rest/DratRestResource.java
+++ b/proteus/src/main/java/drat/proteus/rest/DratRestResource.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.logging.Logger;
 
 import org.wicketstuff.rest.annotations.MethodMapping;
 import org.wicketstuff.rest.annotations.parameters.RequestBody;
@@ -34,7 +35,7 @@
 import backend.ProcessOodtWrapper;
 
 public class DratRestResource extends AbstractRestResource<GsonWebSerialDeserial> {
-
+  private static final Logger LOG = Logger.getLogger(DratRestResource.class.getName());
   private static final long serialVersionUID = -5885535059043262485L;
   public AbstractOodtWrapper oodtWrapper;
   public AbstractDratWrapper dratWrapper;
diff --git a/proteus/src/main/java/drat/proteus/workflow/rest/DynamicWorkflowRequestWrapper.java b/proteus/src/main/java/drat/proteus/workflow/rest/DynamicWorkflowRequestWrapper.java
new file mode 100644
index 0000000..f3a917c
--- /dev/null
+++ b/proteus/src/main/java/drat/proteus/workflow/rest/DynamicWorkflowRequestWrapper.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package drat.proteus.workflow.rest;
+
+import java.util.List;
+
+/**
+ * This is the wrapper which wraps the dynamicWorkflow request body
+ * of execute dynamic workflow rest api
+ */
+public class DynamicWorkflowRequestWrapper {
+    public List<String> taskIds;
+}
diff --git a/proteus/src/main/java/drat/proteus/workflow/rest/WorkflowRestResource.java b/proteus/src/main/java/drat/proteus/workflow/rest/WorkflowRestResource.java
new file mode 100644
index 0000000..21be1bf
--- /dev/null
+++ b/proteus/src/main/java/drat/proteus/workflow/rest/WorkflowRestResource.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package drat.proteus.workflow.rest;
+
+
+import java.io.IOException;
+import java.util.logging.Logger;
+
+
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.metadata.util.PathUtils;
+import org.apache.oodt.cas.workflow.system.XmlRpcWorkflowManagerClient;
+import org.apache.oodt.pcs.util.WorkflowManagerUtils;
+import org.wicketstuff.rest.annotations.MethodMapping;
+import org.wicketstuff.rest.annotations.parameters.RequestBody;
+import org.wicketstuff.rest.contenthandling.json.webserialdeserial.GsonWebSerialDeserial;
+import org.wicketstuff.rest.resource.AbstractRestResource;
+import org.wicketstuff.rest.utils.http.HttpMethod;
+
+import backend.FileConstants;
+
+/**
+ * This is where all the rest apis related to workflow in drat are declared
+ * */
+public class WorkflowRestResource extends AbstractRestResource<GsonWebSerialDeserial> {
+    
+    
+    private static final long serialVersionUID = -5885885059043262485L;
+    XmlRpcWorkflowManagerClient wm;
+    
+    
+    
+    private static final Logger LOG = Logger.getLogger(WorkflowRestResource.class.getName());
+    public WorkflowRestResource() {
+        super(new GsonWebSerialDeserial());
+        wm =  new WorkflowManagerUtils(PathUtils.replaceEnvVariables(FileConstants.CLIENT_URL)).getClient();
+    }
+    
+    @MethodMapping(value = "/dynamic", httpMethod = HttpMethod.POST)
+    public String performDynamicWorkFlow(@RequestBody DynamicWorkflowRequestWrapper requestBody ) {
+   
+        try {
+            Metadata metaData = new Metadata();
+            LOG.info(requestBody.taskIds.get(0));
+            wm.executeDynamicWorkflow(requestBody.taskIds,metaData);
+            return "OK";
+        }catch(IOException ex) {
+            LOG.info("Workflow Service Error " + ex.getMessage());
+            return "Connectiing to Server Eroor";
+        }catch(Exception ex) {
+            LOG.info("Workflow Service Error " + ex.getMessage());
+            return "Failed to connect to client Url";
+        }
+    }
+    
+    
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services