You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/04 16:51:43 UTC
svn commit: r1584729 - in /pig/trunk/src/org/apache/pig:
backend/hadoop/executionengine/fetch/FetchContext.java
backend/hadoop/executionengine/fetch/FetchLauncher.java
tools/pigstats/PigStatusReporter.java
Author: cheolsoo
Date: Fri Apr 4 14:51:42 2014
New Revision: 1584729
URL: http://svn.apache.org/r1584729
Log:
PIG-3858: PigLogger/PigStatusReporter is not set for fetch tasks (lbendig via cheolsoo)
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java?rev=1584729&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchContext.java Fri Apr 4 14:51:42 2014
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.fetch;
+
+import org.apache.hadoop.mapred.Counters;
+
+/**
+ * A dummy counter handling context for fetch tasks
+ *
+ */
+public class FetchContext {
+
+ private Counters counters = new Counters();
+
+ public Counters.Counter getCounter(String group, String name) {
+ return counters.findCounter(group, name);
+ }
+
+ public Counters.Counter getCounter(Enum<?> name) {
+ return counters.findCounter(name);
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1584729&r1=1584728&r2=1584729&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Fri Apr 4 14:51:42 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -40,8 +41,9 @@ import org.apache.pig.impl.plan.Dependen
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.EmptyPigStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
import org.joda.time.DateTimeZone;
/**
@@ -128,6 +130,13 @@ public class FetchLauncher {
// ensure that the internal timezone is uniformly in UTC offset style
DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
}
+
+ boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setAggregate(aggregateWarning);
+ PigStatusReporter.getInstance().setFetchContext(new FetchContext());
+ pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+ PhysicalOperator.setPigLogger(pigHadoopLogger);
}
private void runPipeline(POStore posStore) throws IOException {
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1584729&r1=1584728&r2=1584729&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Fri Apr 4 14:51:42 2014
@@ -22,6 +22,7 @@ import org.apache.hadoop.mapreduce.Count
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.util.Progressable;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchContext;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
@@ -31,7 +32,10 @@ import org.apache.pig.classification.Int
public class PigStatusReporter extends StatusReporter implements Progressable {
private TaskInputOutputContext context;
+ private FetchContext fetchContext;
+
private static PigStatusReporter reporter = null;
+
/**
* Get singleton instance of the context
*/
@@ -41,35 +45,41 @@ public class PigStatusReporter extends S
}
return reporter;
}
-
+
public static void setContext(TaskInputOutputContext context) {
reporter = new PigStatusReporter(context);
}
-
+
private PigStatusReporter(TaskInputOutputContext context) {
this.context = context;
}
-
+
@Override
- public Counter getCounter(Enum<?> name) {
- return (context == null) ? null : context.getCounter(name);
+ public Counter getCounter(Enum<?> name) {
+ if (fetchContext != null) {
+ return fetchContext.getCounter(name);
+ }
+ return (context == null) ? null : context.getCounter(name);
}
@Override
public Counter getCounter(String group, String name) {
+ if (fetchContext != null) {
+ return fetchContext.getCounter(group, name);
+ }
return (context == null) ? null : context.getCounter(group, name);
}
@Override
public void progress() {
- if (context != null) {
+ if (fetchContext == null && context != null) {
context.progress();
}
}
@Override
public void setStatus(String status) {
- if (context != null) {
+ if (fetchContext == null && context != null) {
context.setStatus(status);
}
}
@@ -77,4 +87,13 @@ public class PigStatusReporter extends S
public float getProgress() {
return 0;
}
+
+ /**
+ * Sets a dummy counter handler for fetch tasks
+ * @param fetchContext
+ */
+ public void setFetchContext(FetchContext fetchContext) {
+ this.fetchContext = fetchContext;
+ }
+
}