You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/04 16:51:43 UTC

svn commit: r1584729 - in /pig/trunk/src/org/apache/pig: backend/hadoop/executionengine/fetch/FetchContext.java backend/hadoop/executionengine/fetch/FetchLauncher.java tools/pigstats/PigStatusReporter.java

Author: cheolsoo
Date: Fri Apr  4 14:51:42 2014
New Revision: 1584729

URL: http://svn.apache.org/r1584729
Log:
PIG-3858: PigLogger/PigStatusReporter is not set for fetch tasks (lbendig via cheolsoo)

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java
Modified:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java?rev=1584729&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java Fri Apr  4 14:51:42 2014
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.fetch;
+
+import org.apache.hadoop.mapred.Counters;
+
+/**
+ * A dummy counter handling context for fetch tasks
+ *
+ */
+public class FetchContext {
+
+    private Counters counters = new Counters();
+
+    public Counters.Counter getCounter(String group, String name) {
+        return counters.findCounter(group, name);
+    }
+
+    public Counters.Counter getCounter(Enum<?> name) {
+        return counters.findCounter(name);
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1584729&r1=1584728&r2=1584729&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Fri Apr  4 14:51:42 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -40,8 +41,9 @@ import org.apache.pig.impl.plan.Dependen
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.EmptyPigStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 import org.joda.time.DateTimeZone;
 
 /**
@@ -128,6 +130,13 @@ public class FetchLauncher {
             // ensure that the internal timezone is uniformly in UTC offset style
             DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
         }
+        
+        boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+        PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+        pigHadoopLogger.setAggregate(aggregateWarning);
+        PigStatusReporter.getInstance().setFetchContext(new FetchContext());
+        pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+        PhysicalOperator.setPigLogger(pigHadoopLogger);
     }
 
     private void runPipeline(POStore posStore) throws IOException {

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1584729&r1=1584728&r2=1584729&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Fri Apr  4 14:51:42 2014
@@ -22,6 +22,7 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.util.Progressable;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchContext;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 
@@ -31,7 +32,10 @@ import org.apache.pig.classification.Int
 public class PigStatusReporter extends StatusReporter implements Progressable {
 
     private TaskInputOutputContext context;
+    private FetchContext fetchContext;
+
     private static PigStatusReporter reporter = null;
+
     /**
      * Get singleton instance of the context
      */
@@ -41,35 +45,41 @@ public class PigStatusReporter extends S
         }
         return reporter;
     }
-    
+
     public static void setContext(TaskInputOutputContext context) {
         reporter = new PigStatusReporter(context);
     }
-    
+
     private PigStatusReporter(TaskInputOutputContext context) {
         this.context = context;
     }
-    
+
     @Override
-    public Counter getCounter(Enum<?> name) {        
-        return (context == null) ? null : context.getCounter(name);
+    public Counter getCounter(Enum<?> name) {
+        if (fetchContext != null) {
+            return fetchContext.getCounter(name);  
+        }
+        return (context == null) ? null : context.getCounter(name); 
     }
 
     @Override
     public Counter getCounter(String group, String name) {
+        if (fetchContext != null) {
+            return fetchContext.getCounter(group, name);
+        }
         return (context == null) ? null : context.getCounter(group, name);
     }
 
     @Override
     public void progress() {
-        if (context != null) {
+        if (fetchContext == null && context != null) {
             context.progress();
         }
     }
 
     @Override
     public void setStatus(String status) {
-        if (context != null) {
+        if (fetchContext == null && context != null) {
             context.setStatus(status);
         }
     }
@@ -77,4 +87,13 @@ public class PigStatusReporter extends S
     public float getProgress() {
         return 0;
     }
+
+    /**
+     * Sets a dummy counter handler for fetch tasks
+     * @param fetchContext
+     */
+    public void setFetchContext(FetchContext fetchContext) {
+        this.fetchContext = fetchContext;
+    }
+
 }