You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 08:19:46 UTC
svn commit: r1784237 [6/22] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.ScriptState;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class PigATSClient {
+ public static class ATSEvent {
+ public ATSEvent(String pigAuditId, String callerId) {
+ this.pigScriptId = pigAuditId;
+ this.callerId = callerId;
+ }
+ String callerId;
+ String pigScriptId;
+ }
+ public static final String ENTITY_TYPE = "PIG_SCRIPT_ID";
+ public static final String ENTITY_CALLERID = "callerId";
+ public static final String CALLER_CONTEXT = "PIG";
+ public static final int AUDIT_ID_MAX_LENGTH = 128;
+
+ private static final Log log = LogFactory.getLog(PigATSClient.class.getName());
+ private static PigATSClient instance;
+ private static ExecutorService executor;
+ private TimelineClient timelineClient;
+
+ public static synchronized PigATSClient getInstance() {
+ if (instance==null) {
+ instance = new PigATSClient();
+ }
+ return instance;
+ }
+
+ private PigATSClient() {
+ if (executor == null) {
+ executor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build());
+ YarnConfiguration yarnConf = new YarnConfiguration();
+ timelineClient = TimelineClient.createTimelineClient();
+ timelineClient.init(yarnConf);
+ timelineClient.start();
+ }
+ Utils.addShutdownHookWithPriority(new Runnable() {
+ @Override
+ public void run() {
+ timelineClient.stop();
+ executor.shutdownNow();
+ executor = null;
+ }
+ }, PigImplConstants.SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY);
+ log.info("Created ATS Hook");
+ }
+
+ public static String getPigAuditId(PigContext context) {
+ String auditId;
+ if (context.getProperties().get(PigImplConstants.PIG_AUDIT_ID) != null) {
+ auditId = (String)context.getProperties().get(PigImplConstants.PIG_AUDIT_ID);
+ } else {
+ ScriptState ss = ScriptState.get();
+ String filename = ss.getFileName().isEmpty()?"default" : new File(ss.getFileName()).getName();
+ auditId = CALLER_CONTEXT + "-" + filename + "-" + ss.getId();
+ }
+ return auditId.substring(0, Math.min(auditId.length(), AUDIT_ID_MAX_LENGTH));
+ }
+
+ synchronized public void logEvent(final ATSEvent event) {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityId(event.pigScriptId);
+ entity.setEntityType(ENTITY_TYPE);
+ entity.addPrimaryFilter(ENTITY_CALLERID, event.callerId!=null?event.callerId : "default");
+ try {
+ timelineClient.putEntities(entity);
+ } catch (Exception e) {
+ log.info("Failed to submit plan to ATS: " + e.getMessage());
+ }
+ }
+ });
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
+
+/**
+ * extends the hadoop JobControl to remove the hardcoded sleep(5000)
+ * as most of this is private we have to use reflection
+ *
+ * See {@link https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java }
+ *
+ */
+public class PigJobControl extends JobControl {
+ private static final Log log = LogFactory.getLog(PigJobControl.class);
+
+ private static Field runnerState;
+ private static Field jobsInProgress;
+ private static Field successfulJobs;
+ private static Field failedJobs;
+
+ private static Method failAllJobs;
+
+ private static Method checkState;
+ private static Method submit;
+
+ private static boolean initSuccesful;
+
+ static {
+ try {
+
+ runnerState = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("runnerState");
+ runnerState.setAccessible(true);
+ jobsInProgress = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("jobsInProgress");
+ jobsInProgress.setAccessible(true);
+ successfulJobs = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("successfulJobs");
+ successfulJobs.setAccessible(true);
+ failedJobs = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("failedJobs");
+ failedJobs.setAccessible(true);
+
+ failAllJobs = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredMethod("failAllJobs", Throwable.class);
+ failAllJobs.setAccessible(true);
+
+ checkState = ControlledJob.class.getDeclaredMethod("checkState");
+ checkState.setAccessible(true);
+ submit = ControlledJob.class.getDeclaredMethod("submit");
+ submit.setAccessible(true);
+
+ initSuccesful = true;
+ } catch (Exception e) {
+ log.debug("falling back to default JobControl (not using hadoop 0.23 ?)", e);
+ initSuccesful = false;
+ }
+ }
+
+ protected int timeToSleep;
+
+ /**
+ * Construct a job control for a group of jobs.
+ * @param groupName a name identifying this group
+ * @param pigContext
+ * @param conf
+ */
+ public PigJobControl(String groupName, int timeToSleep) {
+ super(groupName);
+ this.timeToSleep = timeToSleep;
+ }
+
+ public int getTimeToSleep() {
+ return timeToSleep;
+ }
+
+ public void setTimeToSleep(int timeToSleep) {
+ this.timeToSleep = timeToSleep;
+ }
+
+ private void setRunnerState(ThreadState state) {
+ try {
+ runnerState.set(this, state);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ private ThreadState getRunnerState() {
+ try {
+ return (ThreadState)runnerState.get(this);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private State checkState(ControlledJob j) {
+ try {
+ return (State)checkState.invoke(j);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private State submit(ControlledJob j) {
+ try {
+ return (State)submit.invoke(j);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private LinkedList<ControlledJob> getJobs(Field field) {
+ try {
+ return (LinkedList<ControlledJob>)field.get(this);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void failAllJobs(Throwable t) {
+ try {
+ failAllJobs.invoke(this, t);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * The main loop for the thread.
+ * The loop does the following:
+ * Check the states of the running jobs
+ * Update the states of waiting jobs
+ * Submit the jobs in ready state
+ */
+ public void run() {
+ if (!initSuccesful) {
+ super.run();
+ return;
+ }
+ try {
+ setRunnerState(ThreadState.RUNNING);
+ while (true) {
+ while (getRunnerState() == ThreadState.SUSPENDED) {
+ try {
+ Thread.sleep(timeToSleep);
+ }
+ catch (Exception e) {
+ //TODO the thread was interrupted, do something!!!
+ }
+ }
+
+ synchronized(this) {
+ Iterator<ControlledJob> it = getJobs(jobsInProgress).iterator();
+ if (!it.hasNext()) {
+ stop();
+ }
+ while(it.hasNext()) {
+ ControlledJob j = it.next();
+
+ // TODO: Need to re-visit the following try...catch
+ // when Pig picks up a Hadoop release with MAPREDUCE-6762 applied
+ // as its dependency.
+ try {
+ log.debug("Checking state of job " + j);
+ } catch(NullPointerException npe) {
+ log.warn("Failed to get job name " +
+ "when checking state of job. " +
+ "Check if job status is null.", npe);
+ }
+
+ switch(checkState(j)) {
+ case SUCCESS:
+ getJobs(successfulJobs).add(j);
+ it.remove();
+ break;
+ case FAILED:
+ case DEPENDENT_FAILED:
+ getJobs(failedJobs).add(j);
+ it.remove();
+ break;
+ case READY:
+ submit(j);
+ break;
+ case RUNNING:
+ case WAITING:
+ //Do Nothing
+ break;
+ }
+ }
+ }
+
+ if (getRunnerState() != ThreadState.RUNNING &&
+ getRunnerState() != ThreadState.SUSPENDED) {
+ break;
+ }
+ try {
+ Thread.sleep(timeToSleep);
+ }
+ catch (Exception e) {
+ //TODO the thread was interrupted, do something!!!
+ }
+ if (getRunnerState() != ThreadState.RUNNING &&
+ getRunnerState() != ThreadState.SUSPENDED) {
+ break;
+ }
+ }
+ }catch(Throwable t) {
+ log.error("Error while trying to run jobs.",t);
+ //Mark all jobs as failed because we got something bad.
+ failAllJobs(t);
+ }
+ setRunnerState(ThreadState.STOPPED);
+ }
+
+
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java Fri Feb 24 08:19:42 2017
@@ -17,8 +17,6 @@
package org.apache.pig.backend.hadoop.accumulo;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Collection;
@@ -303,24 +301,8 @@ public abstract class AbstractAccumuloSt
*/
protected void simpleUnset(Configuration conf,
Map<String, String> entriesToUnset) {
- try {
- Method unset = conf.getClass().getMethod("unset", String.class);
-
- for (String key : entriesToUnset.keySet()) {
- unset.invoke(conf, key);
- }
- } catch (NoSuchMethodException e) {
- log.error("Could not invoke Configuration.unset method", e);
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- log.error("Could not invoke Configuration.unset method", e);
- throw new RuntimeException(e);
- } catch (IllegalArgumentException e) {
- log.error("Could not invoke Configuration.unset method", e);
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- log.error("Could not invoke Configuration.unset method", e);
- throw new RuntimeException(e);
+ for (String key : entriesToUnset.keySet()) {
+ conf.unset(key);
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java Fri Feb 24 08:19:42 2017
@@ -22,8 +22,6 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLDecoder;
import java.text.MessageFormat;
@@ -42,6 +40,7 @@ import java.util.zip.ZipOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Logger;
@@ -112,7 +111,7 @@ public class Utils {
// attempt to locate an existing jar for the class.
String jar = findContainingJar(my_class, packagedClasses);
if (null == jar || jar.isEmpty()) {
- jar = getJar(my_class);
+ jar = JarFinder.getJar(my_class);
updateMap(jar, packagedClasses);
}
@@ -200,41 +199,6 @@ public class Utils {
}
/**
- * Invoke 'getJar' on a JarFinder implementation. Useful for some job
- * configuration contexts (HBASE-8140) and also for testing on MRv2. First
- * check if we have HADOOP-9426. Lacking that, fall back to the backport.
- *
- * @param my_class
- * the class to find.
- * @return a jar file that contains the class, or null.
- */
- private static String getJar(Class<?> my_class) {
- String ret = null;
- String hadoopJarFinder = "org.apache.hadoop.util.JarFinder";
- Class<?> jarFinder = null;
- try {
- log.debug("Looking for " + hadoopJarFinder + ".");
- jarFinder = Class.forName(hadoopJarFinder);
- log.debug(hadoopJarFinder + " found.");
- Method getJar = jarFinder.getMethod("getJar", Class.class);
- ret = (String) getJar.invoke(null, my_class);
- } catch (ClassNotFoundException e) {
- log.debug("Using backported JarFinder.");
- ret = jarFinderGetJar(my_class);
- } catch (InvocationTargetException e) {
- // function was properly called, but threw it's own exception.
- // Unwrap it
- // and pass it on.
- throw new RuntimeException(e.getCause());
- } catch (Exception e) {
- // toss all other exceptions, related to reflection failure
- throw new RuntimeException("getJar invocation failed.", e);
- }
-
- return ret;
- }
-
- /**
* Returns the full path to the Jar containing the class. It always return a
* JAR.
*
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java Fri Feb 24 08:19:42 2017
@@ -29,7 +29,6 @@ import org.apache.pig.ExecType;
import org.apache.pig.PigConstants;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
public class ConfigurationUtil {
@@ -89,7 +88,7 @@ public class ConfigurationUtil {
// so build/classes/hadoop-site.xml contains such entry. This prevents some tests from
// successful (They expect those files in hdfs), so we need to unset it in hadoop 23.
// This should go away once MiniMRCluster fix the distributed cache issue.
- HadoopShims.unsetConf(localConf, MRConfiguration.JOB_CACHE_FILES);
+ localConf.unset(MRConfiguration.JOB_CACHE_FILES);
}
localConf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
Properties props = ConfigurationUtil.toProperties(localConf);
@@ -106,4 +105,14 @@ public class ConfigurationUtil {
}
}
}
+
+ /**
+ * Returns Properties containing alternative names of given property and same values - can be used to solve deprecations
+ * @return
+ */
+ public static Properties expandForAlternativeNames(String name, String value){
+ final Configuration config = new Configuration(false);
+ config.set(name,value);
+ return ConfigurationUtil.toProperties(config);
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java Fri Feb 24 08:19:42 2017
@@ -18,20 +18,20 @@
package org.apache.pig.backend.hadoop.datastorage;
-import java.net.URI;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
import java.util.Enumeration;
-import java.util.Map;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigException;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
@@ -40,8 +40,6 @@ import org.apache.pig.backend.datastorag
public class HDataStorage implements DataStorage {
- private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
-
private FileSystem fs;
private Configuration configuration;
private Properties properties;
@@ -58,9 +56,10 @@ public class HDataStorage implements Dat
init();
}
+ @Override
public void init() {
// check if name node is set, if not we set local as fail back
- String nameNode = this.properties.getProperty(FILE_SYSTEM_LOCATION);
+ String nameNode = this.properties.getProperty(FileSystem.FS_DEFAULT_NAME_KEY);
if (nameNode == null || nameNode.length() == 0) {
nameNode = "local";
}
@@ -76,14 +75,17 @@ public class HDataStorage implements Dat
}
}
+ @Override
public void close() throws IOException {
fs.close();
}
-
+
+ @Override
public Properties getConfiguration() {
return this.properties;
}
+ @Override
public void updateConfiguration(Properties newConfiguration)
throws DataStorageException {
// TODO sgroschupf 25Feb2008 this method is never called and
@@ -92,38 +94,40 @@ public class HDataStorage implements Dat
if (newConfiguration == null) {
return;
}
-
+
Enumeration<Object> newKeys = newConfiguration.keys();
-
+
while (newKeys.hasMoreElements()) {
String key = (String) newKeys.nextElement();
String value = null;
-
+
value = newConfiguration.getProperty(key);
-
+
fs.getConf().set(key,value);
}
}
-
+
+ @Override
public Map<String, Object> getStatistics() throws IOException {
Map<String, Object> stats = new HashMap<String, Object>();
long usedBytes = fs.getUsed();
stats.put(USED_BYTES_KEY , Long.valueOf(usedBytes).toString());
-
+
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) fs;
-
+
long rawCapacityBytes = dfs.getRawCapacity();
stats.put(RAW_CAPACITY_KEY, Long.valueOf(rawCapacityBytes).toString());
-
+
long rawUsedBytes = dfs.getRawUsed();
stats.put(RAW_USED_KEY, Long.valueOf(rawUsedBytes).toString());
}
-
+
return stats;
}
-
+
+ @Override
public ElementDescriptor asElement(String name) throws DataStorageException {
if (this.isContainer(name)) {
return new HDirectory(this, name);
@@ -132,70 +136,82 @@ public class HDataStorage implements Dat
return new HFile(this, name);
}
}
-
+
+ @Override
public ElementDescriptor asElement(ElementDescriptor element)
throws DataStorageException {
return asElement(element.toString());
}
-
+
+ @Override
public ElementDescriptor asElement(String parent,
- String child)
+ String child)
throws DataStorageException {
return asElement((new Path(parent, child)).toString());
}
+ @Override
public ElementDescriptor asElement(ContainerDescriptor parent,
- String child)
+ String child)
throws DataStorageException {
return asElement(parent.toString(), child);
}
+ @Override
public ElementDescriptor asElement(ContainerDescriptor parent,
- ElementDescriptor child)
+ ElementDescriptor child)
throws DataStorageException {
return asElement(parent.toString(), child.toString());
}
- public ContainerDescriptor asContainer(String name)
+ @Override
+ public ContainerDescriptor asContainer(String name)
throws DataStorageException {
return new HDirectory(this, name);
}
-
+
+ @Override
public ContainerDescriptor asContainer(ContainerDescriptor container)
throws DataStorageException {
return new HDirectory(this, container.toString());
}
-
+
+ @Override
public ContainerDescriptor asContainer(String parent,
- String child)
+ String child)
throws DataStorageException {
return new HDirectory(this, parent, child);
}
+ @Override
public ContainerDescriptor asContainer(ContainerDescriptor parent,
- String child)
+ String child)
throws DataStorageException {
return new HDirectory(this, parent.toString(), child);
}
-
+
+ @Override
public ContainerDescriptor asContainer(ContainerDescriptor parent,
ContainerDescriptor child)
throws DataStorageException {
return new HDirectory(this, parent.toString(), child.toString());
}
-
+
+ @Override
public void setActiveContainer(ContainerDescriptor container) {
fs.setWorkingDirectory(new Path(container.toString()));
}
-
+
+ @Override
public ContainerDescriptor getActiveContainer() {
return new HDirectory(this, fs.getWorkingDirectory());
}
+ @Override
public boolean isContainer(String name) throws DataStorageException {
boolean isContainer = false;
Path path = new Path(name);
-
+
try {
if ((this.fs.exists(path)) && (! this.fs.isFile(path))) {
isContainer = true;
@@ -206,10 +222,11 @@ public class HDataStorage implements Dat
String msg = "Unable to check name " + name;
throw new DataStorageException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e);
}
-
+
return isContainer;
}
-
+
+ @Override
public HPath[] asCollection(String pattern) throws DataStorageException {
try {
FileStatus[] paths = this.fs.globStatus(new Path(pattern));
@@ -218,7 +235,7 @@ public class HDataStorage implements Dat
return new HPath[0];
List<HPath> hpaths = new ArrayList<HPath>();
-
+
for (int i = 0; i < paths.length; ++i) {
HPath hpath = (HPath)this.asElement(paths[i].getPath().toString());
if (!hpath.systemElement()) {
@@ -233,7 +250,7 @@ public class HDataStorage implements Dat
throw new DataStorageException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e);
}
}
-
+
public FileSystem getHFS() {
return fs;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Feb 24 08:19:42 2017
@@ -30,6 +30,7 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.PigException;
@@ -76,8 +77,6 @@ public abstract class HExecutionEngine i
public static final String MAPRED_DEFAULT_SITE = "mapred-default.xml";
public static final String YARN_DEFAULT_SITE = "yarn-default.xml";
- public static final String FILE_SYSTEM_LOCATION = "fs.default.name";
- public static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = "fs.defaultFS";
public static final String LOCAL = "local";
protected PigContext pigContext;
@@ -203,8 +202,8 @@ public abstract class HExecutionEngine i
properties.setProperty(MRConfiguration.FRAMEWORK_NAME, LOCAL);
}
properties.setProperty(MRConfiguration.JOB_TRACKER, LOCAL);
- properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
- properties.setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, "file:///");
+ properties.remove("fs.default.name"); //Deprecated in Hadoop 2.x
+ properties.setProperty(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
jc = getLocalConf();
JobConf s3Jc = getS3Conf();
@@ -220,24 +219,7 @@ public abstract class HExecutionEngine i
HKerberos.tryKerberosKeytabLogin(jc);
cluster = jc.get(MRConfiguration.JOB_TRACKER);
- nameNode = jc.get(FILE_SYSTEM_LOCATION);
- if (nameNode == null) {
- nameNode = (String) pigContext.getProperties().get(ALTERNATIVE_FILE_SYSTEM_LOCATION);
- }
-
- if (cluster != null && cluster.length() > 0) {
- if (!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
- cluster = cluster + ":50020";
- }
- properties.setProperty(MRConfiguration.JOB_TRACKER, cluster);
- }
-
- if (nameNode != null && nameNode.length() > 0) {
- if (!nameNode.contains(":") && !nameNode.equalsIgnoreCase(LOCAL)) {
- nameNode = nameNode + ":8020";
- }
- properties.setProperty(FILE_SYSTEM_LOCATION, nameNode);
- }
+ nameNode = jc.get(FileSystem.FS_DEFAULT_NAME_KEY);
LOG.info("Connecting to hadoop file system at: "
+ (nameNode == null ? LOCAL : nameNode));
@@ -369,7 +351,11 @@ public abstract class HExecutionEngine i
@Override
public void setProperty(String property, String value) {
Properties properties = pigContext.getProperties();
- properties.put(property, value);
+ if (Configuration.isDeprecated(property)) {
+ properties.putAll(ConfigurationUtil.expandForAlternativeNames(property, value));
+ } else {
+ properties.put(property, value);
+ }
}
@Override
@@ -378,6 +364,13 @@ public abstract class HExecutionEngine i
}
@Override
+ public void kill() throws BackendException {
+ if (launcher != null) {
+ launcher.kill();
+ }
+ }
+
+ @Override
public void killJob(String jobID) throws BackendException {
if (launcher != null) {
launcher.killJob(jobID, getJobConf());
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Fri Feb 24 08:19:42 2017
@@ -40,7 +40,7 @@ import org.apache.pig.tools.pigstats.Pig
public class HJob implements ExecJob {
private final Log log = LogFactory.getLog(getClass());
-
+
protected JOB_STATUS status;
protected PigContext pigContext;
protected FileSpec outFileSpec;
@@ -48,7 +48,7 @@ public class HJob implements ExecJob {
protected String alias;
protected POStore poStore;
private PigStats stats;
-
+
public HJob(JOB_STATUS status,
PigContext pigContext,
POStore store,
@@ -59,7 +59,7 @@ public class HJob implements ExecJob {
this.outFileSpec = poStore.getSFile();
this.alias = alias;
}
-
+
public HJob(JOB_STATUS status,
PigContext pigContext,
POStore store,
@@ -72,37 +72,41 @@ public class HJob implements ExecJob {
this.alias = alias;
this.stats = stats;
}
-
+
+ @Override
public JOB_STATUS getStatus() {
return status;
}
-
+
+ @Override
public boolean hasCompleted() throws ExecException {
return true;
}
-
+
+ @Override
public Iterator<Tuple> getResults() throws ExecException {
final LoadFunc p;
-
+
try{
- LoadFunc originalLoadFunc =
+ LoadFunc originalLoadFunc =
(LoadFunc)PigContext.instantiateFuncFromSpec(
outFileSpec.getFuncSpec());
-
- p = (LoadFunc) new ReadToEndLoader(originalLoadFunc,
+
+ p = (LoadFunc) new ReadToEndLoader(originalLoadFunc,
ConfigurationUtil.toConfiguration(
- pigContext.getProperties()), outFileSpec.getFileName(), 0, pigContext);
+ pigContext.getProperties()), outFileSpec.getFileName(), 0);
}catch (Exception e){
int errCode = 2088;
String msg = "Unable to get results for: " + outFileSpec;
throw new ExecException(msg, errCode, PigException.BUG, e);
}
-
+
return new Iterator<Tuple>() {
Tuple t;
boolean atEnd;
+ @Override
public boolean hasNext() {
if (atEnd)
return false;
@@ -120,6 +124,7 @@ public class HJob implements ExecJob {
return !atEnd;
}
+ @Override
public Tuple next() {
Tuple next = t;
if (next != null) {
@@ -136,6 +141,7 @@ public class HJob implements ExecJob {
return next;
}
+ @Override
public void remove() {
throw new RuntimeException("Removal not supported");
}
@@ -143,31 +149,38 @@ public class HJob implements ExecJob {
};
}
+ @Override
public Properties getConfiguration() {
return pigContext.getProperties();
}
+ @Override
public PigStats getStatistics() {
//throw new UnsupportedOperationException();
return stats;
}
+ @Override
public void completionNotification(Object cookie) {
throw new UnsupportedOperationException();
}
-
+
+ @Override
public void kill() throws ExecException {
throw new UnsupportedOperationException();
}
-
+
+ @Override
public void getLogs(OutputStream log) throws ExecException {
throw new UnsupportedOperationException();
}
-
+
+ @Override
public void getSTDOut(OutputStream out) throws ExecException {
throw new UnsupportedOperationException();
}
-
+
+ @Override
public void getSTDError(OutputStream error) throws ExecException {
throw new UnsupportedOperationException();
}
@@ -176,6 +189,7 @@ public class HJob implements ExecJob {
backendException = e;
}
+ @Override
public Exception getException() {
return backendException;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Fri Feb 24 08:19:42 2017
@@ -32,7 +32,8 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.TIPStatus;
+import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.FuncSpec;
@@ -40,7 +41,6 @@ import org.apache.pig.PigException;
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.PlanException;
@@ -76,7 +76,7 @@ public abstract class Launcher {
protected Map<FileSpec, Exception> failureMap;
protected JobControl jc = null;
- class HangingJobKiller extends Thread {
+ protected class HangingJobKiller extends Thread {
public HangingJobKiller() {}
@Override
@@ -90,7 +90,6 @@ public abstract class Launcher {
}
protected Launcher() {
- Runtime.getRuntime().addShutdownHook(new HangingJobKiller());
// handle the windows portion of \r
if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
newLine = "\r\n";
@@ -104,7 +103,6 @@ public abstract class Launcher {
public void reset() {
failureMap = Maps.newHashMap();
totalHadoopTimeSpent = 0;
- jc = null;
}
/**
@@ -179,7 +177,7 @@ public abstract class Launcher {
String exceptionCreateFailMsg = null;
boolean jobFailed = false;
if (msgs.length > 0) {
- if (HadoopShims.isJobFailed(report)) {
+ if (report.getCurrentStatus()== TIPStatus.FAILED) {
jobFailed = true;
}
Set<String> errorMessageSet = new HashSet<String>();
@@ -261,11 +259,30 @@ public abstract class Launcher {
List<Job> runnJobs = jc.getRunningJobs();
for (Job j : runnJobs) {
- prog += HadoopShims.progressOfRunningJob(j);
+ prog += progressOfRunningJob(j);
}
return prog;
}
+ /**
+ * Returns the progress of a Job j which is part of a submitted JobControl
+ * object. The progress is for this Job. So it has to be scaled down by the
+ * num of jobs that are present in the JobControl.
+ *
+ * @param j The Job for which progress is required
+ * @return Returns the percentage progress of this Job
+ * @throws IOException
+ */
+ private static double progressOfRunningJob(Job j)
+ throws IOException {
+ org.apache.hadoop.mapreduce.Job mrJob = j.getJob();
+ try {
+ return (mrJob.mapProgress() + mrJob.reduceProgress()) / 2;
+ } catch (Exception ir) {
+ return 0;
+ }
+ }
+
public long getTotalHadoopTimeSpent() {
return totalHadoopTimeSpent;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Fri Feb 24 08:19:42 2017
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
@@ -122,7 +123,8 @@ public class FetchLauncher {
poStore.setUp();
TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID();
- HadoopShims.setTaskAttemptId(conf, taskAttemptID);
+ //Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
+ conf.setInt(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, taskAttemptID.getId());
if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java Fri Feb 24 08:19:42 2017
@@ -95,7 +95,7 @@ public class FetchPOStoreImpl extends PO
}
if (outputCommitter.needsTaskCommit(context))
outputCommitter.commitTask(context);
- HadoopShims.commitOrCleanup(outputCommitter, context);
+ outputCommitter.commitJob(context);
}
@Override
@@ -109,7 +109,7 @@ public class FetchPOStoreImpl extends PO
}
writer = null;
}
- HadoopShims.commitOrCleanup(outputCommitter, context);
+ outputCommitter.commitJob(context);
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java Fri Feb 24 08:19:42 2017
@@ -22,43 +22,48 @@ import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
-
-import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
/**
* A special implementation of combiner used only for distinct. This combiner
* does not even parse out the records. It just throws away duplicate values
- * in the key in order ot minimize the data being sent to the reduce.
+ * in the key in order to minimize the data being sent to the reduce.
*/
public class DistinctCombiner {
- public static class Combine
+ public static class Combine
extends Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
-
+
private final Log log = LogFactory.getLog(getClass());
- ProgressableReporter pigReporter;
-
- /**
- * Configures the reporter
- */
+ private static boolean firstTime = true;
+
+ //@StaticDataCleanup
+ public static void staticDataCleanup() {
+ firstTime = true;
+ }
+
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- pigReporter = new ProgressableReporter();
+ Configuration jConf = context.getConfiguration();
+ // Avoid log spamming
+ if (firstTime) {
+ log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
+ firstTime = false;
+ }
}
-
+
/**
* The reduce function which removes values.
*/
@Override
- protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
+ protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
throws IOException, InterruptedException {
-
- pigReporter.setRep(context);
// Take the first value and the key and collect
// just that.
@@ -66,6 +71,7 @@ public class DistinctCombiner {
NullableTuple val = iter.next();
context.write(key, val);
}
+
}
-
+
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Fri Feb 24 08:19:42 2017
@@ -75,16 +75,24 @@ public class FileBasedOutputSizeReader i
return -1;
}
- long bytes = 0;
Path p = new Path(getLocationUri(sto));
- FileSystem fs = p.getFileSystem(conf);
- FileStatus[] lst = fs.listStatus(p);
+ return getPathSize(p, p.getFileSystem(conf));
+ }
+
+ private long getPathSize(Path storePath, FileSystem fs) throws IOException {
+ long bytes = 0;
+ FileStatus[] lst = fs.listStatus(storePath);
if (lst != null) {
for (FileStatus status : lst) {
- bytes += status.getLen();
+ if (status.isFile()) {
+ if (status.getLen() > 0)
+ bytes += status.getLen();
+ }
+ else { // recursively count nested leaves' (files) sizes
+ bytes += getPathSize(status.getPath(), fs);
+ }
}
}
-
return bytes;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Fri Feb 24 08:19:42 2017
@@ -92,7 +92,7 @@ public class InputSizeReducerEstimator i
return reducers;
}
- static long getTotalInputFileSize(Configuration conf,
+ public static long getTotalInputFileSize(Configuration conf,
List<POLoad> lds, Job job) throws IOException {
return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE);
}
@@ -100,7 +100,7 @@ public class InputSizeReducerEstimator i
/**
* Get the input size for as many inputs as possible. Inputs that do not report
* their size nor can pig look that up itself are excluded from this size.
- *
+ *
* @param conf Configuration
* @param lds List of POLoads
* @param job Job
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Feb 24 08:19:42 2017
@@ -24,7 +24,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -61,6 +60,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
@@ -71,6 +71,7 @@ import org.apache.pig.PigException;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.PigJobControl;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
@@ -89,7 +90,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataType;
@@ -122,6 +122,7 @@ import org.apache.pig.impl.util.ObjectSe
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
@@ -311,7 +312,7 @@ public class JobControlCompiler{
" should be a time in ms. default=" + defaultPigJobControlSleep, e);
}
- JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep);
+ JobControl jobCtrl = new PigJobControl(grpName, timeToSleep);
try {
List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
@@ -384,7 +385,7 @@ public class JobControlCompiler{
ArrayList<Pair<String,Long>> counterPairs;
try {
- counters = HadoopShims.getCounters(job);
+ counters = MRJobStats.getCounters(job);
String groupName = getGroupName(counters.getGroupNames());
// In case that the counter group was not find, we need to find
@@ -702,7 +703,8 @@ public class JobControlCompiler{
// since this path would be invalid for the new job being created
pigContext.getProperties().remove("mapreduce.job.credentials.binary");
- conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
+ conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pigContext.getExecType().isLocal());
+ conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pigContext.getLog4jProperties()));
conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList()));
// this is for unit tests since some don't create PigServer
@@ -1671,14 +1673,6 @@ public class JobControlCompiler{
if (distCachePath != null) {
log.info("Jar file " + url + " already in DistributedCache as "
+ distCachePath + ". Not copying to hdfs and adding again");
- // Path already in dist cache
- if (!HadoopShims.isHadoopYARN()) {
- // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth.
- // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files
- // But path may only be in 'mapred.cache.files' and not be in
- // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there
- DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf));
- }
}
else {
// REGISTER always copies locally the jar file. see PigServer.registerJar()
@@ -1964,20 +1958,9 @@ public class JobControlCompiler{
public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
// the OutputFormat we report to Hadoop is always PigOutputFormat which
- // can be wrapped with LazyOutputFormat provided if it is supported by
- // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+ // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set
if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
- try {
- Class<?> clazz = PigContext
- .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
- Method method = clazz.getMethod("setOutputFormatClass",
- org.apache.hadoop.mapreduce.Job.class, Class.class);
- method.invoke(null, job, PigOutputFormat.class);
- } catch (Exception e) {
- job.setOutputFormatClass(PigOutputFormat.class);
- log.warn(PigConfiguration.PIG_OUTPUT_LAZY
- + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
- }
+ LazyOutputFormat.setOutputFormatClass(job,PigOutputFormat.class);
} else {
job.setOutputFormatClass(PigOutputFormat.class);
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Feb 24 08:19:42 2017
@@ -1116,7 +1116,9 @@ public class MRCompiler extends PhyPlanV
try{
nonBlocking(op);
phyToMROpMap.put(op, curMROp);
- if (op.getPkgr().getPackageType() == PackageType.JOIN) {
+ if (op.getPkgr().getPackageType() == PackageType.JOIN
+ || op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
+ // Bloom join is not implemented in mapreduce mode and falls back to regular join
curMROp.markRegularJoin();
} else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
if (op.getNumInps() == 1) {
@@ -1278,7 +1280,7 @@ public class MRCompiler extends PhyPlanV
List<InputSplit> splits = inf.getSplits(HadoopShims.cloneJobContext(job));
List<List<InputSplit>> results = MapRedUtil
.getCombinePigSplits(splits,
- HadoopShims.getDefaultBlockSize(fs, path),
+ fs.getDefaultBlockSize(path),
conf);
numFiles += results.size();
} else {
@@ -2432,7 +2434,7 @@ public class MRCompiler extends PhyPlanV
}else{
for(int i=0; i<transformPlans.size(); i++) {
eps1.add(transformPlans.get(i));
- flat1.add(true);
+ flat1.add(i == transformPlans.size() - 1 ? true : false);
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Feb 24 08:19:42 2017
@@ -19,7 +19,9 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.io.PrintStream;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -40,7 +42,8 @@ import org.apache.hadoop.mapred.JobClien
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.pig.PigConfiguration;
@@ -65,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -78,15 +82,18 @@ import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
+import org.python.google.common.collect.Lists;
+
/**
* Main class that launches pig for Map Reduce
*
*/
-public class MapReduceLauncher extends Launcher{
+public class MapReduceLauncher extends Launcher {
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
@@ -94,14 +101,30 @@ public class MapReduceLauncher extends L
private boolean aggregateWarning = false;
+ public MapReduceLauncher() {
+ super();
+ Utils.addShutdownHookWithPriority(new HangingJobKiller(),
+ PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
+ }
+
@Override
public void kill() {
try {
- log.debug("Receive kill signal");
- if (jc!=null) {
+ if (jc != null && jc.getRunningJobs().size() > 0) {
+ log.info("Received kill signal");
for (Job job : jc.getRunningJobs()) {
- HadoopShims.killJob(job);
+ org.apache.hadoop.mapreduce.Job mrJob = job.getJob();
+ try {
+ if (mrJob != null) {
+ mrJob.killJob();
+ }
+ } catch (Exception ir) {
+ throw new IOException(ir);
+ }
log.info("Job " + job.getAssignedJobID() + " killed");
+ String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+ .format(Calendar.getInstance().getTime());
+ System.err.println(timeStamp + " Job " + job.getAssignedJobID() + " killed");
}
}
} catch (Exception e) {
@@ -301,8 +324,7 @@ public class MapReduceLauncher extends L
// Now wait, till we are finished.
while(!jc.allFinished()){
- try { jcThread.join(sleepTime); }
- catch (InterruptedException e) {}
+ jcThread.join(sleepTime);
List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();
@@ -321,11 +343,6 @@ public class MapReduceLauncher extends L
log.info("detailed locations: " + aliasLocation);
}
- if (!HadoopShims.isHadoopYARN() && jobTrackerLoc != null) {
- log.info("More information at: http://" + jobTrackerLoc
- + "/jobdetails.jsp?jobid=" + job.getAssignedJobID());
- }
-
// update statistics for this job so jobId is set
MRPigStatsUtil.addJobStats(job);
MRScriptState.get().emitJobStartedNotification(
@@ -475,10 +492,6 @@ public class MapReduceLauncher extends L
for (Job job : succJobs) {
List<POStore> sts = jcc.getStores(job);
for (POStore st : sts) {
- if (Utils.isLocal(pc, job.getJobConf())) {
- HadoopShims.storeSchemaForLocal(job, st);
- }
-
if (!st.isTmpStore()) {
// create an "_SUCCESS" file in output location if
// output location is a filesystem dir
@@ -744,7 +757,7 @@ public class MapReduceLauncher extends L
@SuppressWarnings("deprecation")
void computeWarningAggregate(Job job, Map<Enum, Long> aggMap) {
try {
- Counters counters = HadoopShims.getCounters(job);
+ Counters counters = MRJobStats.getCounters(job);
if (counters==null)
{
long nullCounterCount =
@@ -798,13 +811,13 @@ public class MapReduceLauncher extends L
throw new ExecException(backendException);
}
try {
- Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
+ Iterator<TaskReport> mapRep = MRJobStats.getTaskReports(job, TaskType.MAP);
if (mapRep != null) {
getErrorMessages(mapRep, "map", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(mapRep);
mapRep = null;
}
- Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+ Iterator<TaskReport> redRep = MRJobStats.getTaskReports(job, TaskType.REDUCE);
if (redRep != null) {
getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(redRep);
@@ -822,5 +835,6 @@ public class MapReduceLauncher extends L
throw new ExecException(e);
}
}
+
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Fri Feb 24 08:19:42 2017
@@ -65,7 +65,10 @@ public class MapReduceOper extends Opera
// this is needed when the key is null to create
// an appropriate NullableXXXWritable object
public byte mapKeyType;
-
+
+ //record the map key types of all splittees
+ public byte[] mapKeyTypeOfSplittees;
+
//Indicates that the map plan creation
//is complete
boolean mapDone = false;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Fri Feb 24 08:19:42 2017
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -580,18 +581,17 @@ class MultiQueryOptimizer extends MROpPl
}
private boolean hasSameMapKeyType(List<MapReduceOper> splittees) {
- boolean sameKeyType = true;
- for (MapReduceOper outer : splittees) {
- for (MapReduceOper inner : splittees) {
- if (inner.mapKeyType != outer.mapKeyType) {
- sameKeyType = false;
- break;
+ Set<Byte> keyTypes = new HashSet<Byte>();
+ for (MapReduceOper splittee : splittees) {
+ keyTypes.add(splittee.mapKeyType);
+ if (splittee.mapKeyTypeOfSplittees != null) {
+ for (int i = 0; i < splittee.mapKeyTypeOfSplittees.length; i++) {
+ keyTypes.add(splittee.mapKeyTypeOfSplittees[i]);
}
}
- if (!sameKeyType) break;
- }
- return sameKeyType;
+ }
+ return keyTypes.size() == 1;
}
private int setIndexOnLRInSplit(int initial, POSplit splitOp, boolean sameKeyType)
@@ -1035,10 +1035,20 @@ class MultiQueryOptimizer extends MROpPl
splitter.mapKeyType = sameKeyType ?
mergeList.get(0).mapKeyType : DataType.TUPLE;
+
+ setMapKeyTypeForSplitter(splitter,mergeList);
+
log.info("Requested parallelism of splitter: "
+ splitter.getRequestedParallelism());
}
+ private void setMapKeyTypeForSplitter(MapReduceOper splitter, List<MapReduceOper> mergeList) {
+ splitter.mapKeyTypeOfSplittees = new byte[mergeList.size()];
+ for (int i = 0; i < mergeList.size(); i++) {
+ splitter.mapKeyTypeOfSplittees[i] = mergeList.get(i).mapKeyType;
+ }
+ }
+
private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce,
MapReduceOper splitter, POSplit splitOp) throws VisitorException {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Fri Feb 24 08:19:42 2017
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,9 +37,11 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -72,7 +75,6 @@ public class PigCombiner {
PhysicalOperator[] roots;
PhysicalOperator leaf;
- PigContext pigContext = null;
private volatile boolean initialized = false;
//@StaticDataCleanup
@@ -91,9 +93,11 @@ public class PigCombiner {
Configuration jConf = context.getConfiguration();
try {
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
- pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
- if (pigContext.getLog4jProperties()!=null)
- PropertyConfigurator.configure(pigContext.getLog4jProperties());
+ Properties log4jProperties = (Properties) ObjectSerializer
+ .deserialize(jConf.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
+ if (log4jProperties != null) {
+ PropertyConfigurator.configure(log4jProperties);
+ }
UDFContext.getUDFContext().reset();
MapRedUtil.setupUDFContext(context.getConfiguration());
@@ -143,7 +147,7 @@ public class PigCombiner {
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -157,7 +161,7 @@ public class PigCombiner {
// tuples out of the getnext() call of POJoinPackage
// In this case, we process till we see EOP from
// POJoinPacakage.getNext()
- if (pack.getPkgr() instanceof JoinPackager)
+ if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() instanceof BloomPackager)
{
pack.attachInput(key, tupIter.iterator());
while (true)
@@ -268,7 +272,6 @@ public class PigCombiner {
pigReporter = null;
// Avoid OOM in Tez.
PhysicalOperator.setReporter(null);
- pigContext = null;
roots = null;
cp = null;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Fri Feb 24 08:19:42 2017
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +46,7 @@ import org.apache.pig.data.SchemaTupleBa
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
@@ -88,7 +90,6 @@ public abstract class PigGenericMapBase
private PhysicalOperator leaf;
- PigContext pigContext = null;
private volatile boolean initialized = false;
/**
@@ -168,13 +169,15 @@ public abstract class PigGenericMapBase
inIllustrator = inIllustrator(context);
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
- pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
// This attempts to fetch all of the generated code from the distributed cache, and resolve it
- SchemaTupleBackend.initialize(job, pigContext);
+ SchemaTupleBackend.initialize(job);
- if (pigContext.getLog4jProperties()!=null)
- PropertyConfigurator.configure(pigContext.getLog4jProperties());
+ Properties log4jProperties = (Properties) ObjectSerializer
+ .deserialize(job.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
+ if (log4jProperties != null) {
+ PropertyConfigurator.configure(log4jProperties);
+ }
if (mp == null)
mp = (PhysicalPlan) ObjectSerializer.deserialize(
@@ -236,7 +239,7 @@ public abstract class PigGenericMapBase
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -249,8 +252,7 @@ public abstract class PigGenericMapBase
MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
- if (!pigContext.inIllustrator)
- store.setUp();
+ store.setUp();
}
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Fri Feb 24 08:19:42 2017
@@ -287,7 +287,6 @@ public class PigGenericMapReduce {
private PhysicalOperator leaf;
- PigContext pigContext = null;
protected volatile boolean initialized = false;
private boolean inIllustrator = false;
@@ -319,10 +318,9 @@ public class PigGenericMapReduce {
sJobConf = context.getConfiguration();
try {
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
- pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
// This attempts to fetch all of the generated code from the distributed cache, and resolve it
- SchemaTupleBackend.initialize(jConf, pigContext);
+ SchemaTupleBackend.initialize(jConf);
if (rp == null)
rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
@@ -377,7 +375,7 @@ public class PigGenericMapReduce {
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -608,7 +606,7 @@ public class PigGenericMapReduce {
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new MRTaskContext(context));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Fri Feb 24 08:19:42 2017
@@ -17,9 +17,6 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-import java.util.Map;
-import java.util.WeakHashMap;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.EvalFunc;
@@ -41,7 +38,6 @@ public final class PigHadoopLogger imple
private PigStatusReporter reporter = null;
private boolean aggregate = false;
- private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
private PigHadoopLogger() {
}
@@ -68,11 +64,6 @@ public final class PigHadoopLogger imple
if (getAggregate()) {
if (reporter != null) {
- // log at least once
- if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) {
- log.warn(displayMessage);
- msgMap.put(o, displayMessage);
- }
if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
reporter.incrCounter(className, warningEnum.name(), 1);
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Fri Feb 24 08:19:42 2017
@@ -197,14 +197,11 @@ public class PigInputFormat extends Inpu
ArrayList<FileSpec> inputs;
ArrayList<ArrayList<OperatorKey>> inpTargets;
- PigContext pigContext;
try {
inputs = (ArrayList<FileSpec>) ObjectSerializer
.deserialize(conf.get(PIG_INPUTS));
inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
.deserialize(conf.get(PIG_INPUT_TARGETS));
- pigContext = (PigContext) ObjectSerializer.deserialize(conf
- .get("pig.pigContext"));
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));
MapRedUtil.setupUDFContext(conf);
} catch (Exception e) {
@@ -234,7 +231,7 @@ public class PigInputFormat extends Inpu
// if the execution is against Mapred DFS, set
// working dir to /user/<userid>
- if(!Utils.isLocal(pigContext, conf)) {
+ if(!Utils.isLocal(conf)) {
fs.setWorkingDirectory(jobcontext.getWorkingDirectory());
}
@@ -270,7 +267,7 @@ public class PigInputFormat extends Inpu
jobcontext.getJobID()));
List<InputSplit> oneInputPigSplits = getPigSplits(
oneInputSplits, i, inpTargets.get(i),
- HadoopShims.getDefaultBlockSize(fs, isFsPath? path: fs.getWorkingDirectory()),
+ fs.getDefaultBlockSize(isFsPath? path: fs.getWorkingDirectory()),
combinable, confClone);
splits.addAll(oneInputPigSplits);
} catch (ExecException ee) {