You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC

svn commit: r1457129 [12/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Master;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.engine.common.combine.CombineInput;
+import org.apache.tez.engine.common.localshuffle.LocalShuffle;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * {@link LocalMergedInput} in an {@link Input} which shuffles intermediate
+ * sorted data, merges them and provides key/<values> to the consumer. 
+ */
+public class LocalMergedInput extends ShuffledMergedInput {
+
+  TezRawKeyValueIterator rIter = null;
+
+  private final TezTask task;
+  
+  private Configuration conf;
+  private CombineInput raw;
+
+  @Inject
+  public LocalMergedInput(
+      @Assisted TezTask task
+      ) {
+    super(task);
+    this.task = task;
+  }
+
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+    this.conf = conf;
+
+    LocalShuffle shuffle =
+        new LocalShuffle(task, this.conf, (TezTaskReporter)master);
+    rIter = shuffle.run();
+    raw = new CombineInput(rIter);
+  }
+
+  public boolean hasNext() throws IOException, InterruptedException {
+    return raw.hasNext();
+  }
+
+  public Object getNextKey() throws IOException, InterruptedException {
+    return raw.getNextKey();
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public Iterable getNextValues() 
+      throws IOException, InterruptedException {
+    return raw.getNextValues();
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return raw.getProgress();
+  }
+
+  public void close() throws IOException {
+    raw.close();
+  }
+
+  public TezRawKeyValueIterator getIterator() {
+    return rIter;
+  }
+  
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.input;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Master;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.engine.common.combine.CombineInput;
+import org.apache.tez.engine.common.shuffle.impl.Shuffle;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * {@link ShuffledMergedInput} in an {@link Input} which shuffles intermediate
+ * sorted data, merges them and provides key/<values> to the consumer. 
+ */
+public class ShuffledMergedInput implements Input {
+
+  static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class);
+  TezRawKeyValueIterator rIter = null;
+
+  private TezTask task;
+  
+  private Configuration conf;
+  private CombineInput raw;
+
+  @Inject
+  public ShuffledMergedInput(
+      @Assisted TezTask task
+      ) {
+  }
+
+  public void setTask(TezTask task) {
+    this.task = task;
+  }
+  
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+    this.conf = conf;
+    
+    Shuffle shuffle = 
+      new Shuffle(
+          task, this.conf,      
+          this.conf.getInt(
+              TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, 
+              TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_INDEGREE),
+          (TezTaskReporter)master, 
+          task.getCombineProcessor());
+    rIter = shuffle.run();
+    
+    raw = new CombineInput(rIter);
+  }
+
+  public boolean hasNext() throws IOException, InterruptedException {
+    return raw.hasNext();
+  }
+
+  public Object getNextKey() throws IOException, InterruptedException {
+    return raw.getNextKey();
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public Iterable getNextValues() 
+      throws IOException, InterruptedException {
+    return raw.getNextValues();
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return raw.getProgress();
+  }
+
+  public void close() throws IOException {
+    raw.close();
+  }
+
+  public TezRawKeyValueIterator getIterator() {
+    return rIter;
+  }
+  
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.api.Master;
+import org.apache.tez.api.Output;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.common.sort.impl.dflt.InMemoryShuffleSorter;
+import org.apache.tez.records.OutputContext;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * {@link InMemorySortedOutput} is an {@link Output} which sorts key/value pairs 
+ * written to it and persists it to a file.
+ */
+public class InMemorySortedOutput implements SortingOutput {
+  
+  protected InMemoryShuffleSorter sorter;
+  
+  @Inject
+  public InMemorySortedOutput(
+      @Assisted TezTask task
+      ) throws IOException {
+    sorter = new InMemoryShuffleSorter(task);
+  }
+  
+  public void initialize(Configuration conf, Master master) 
+      throws IOException, InterruptedException {
+    sorter.initialize(conf, master);
+  }
+
+  public void setTask(TezTask task) {
+    sorter.setTask(task);
+  }
+  
+  public void write(Object key, Object value) throws IOException,
+      InterruptedException {
+    sorter.write(key, value);
+  }
+
+  public void close() throws IOException, InterruptedException {
+    sorter.flush();
+    sorter.close();
+  }
+
+  @Override
+  public OutputContext getOutputContext() {
+    return sorter.getOutputContext();
+  }
+
+  public InMemoryShuffleSorter getSorter()  {
+    return sorter;
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,64 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.lib.output;
+
+import java.io.IOException;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+public class LocalOnFileSorterOutput extends OnFileSortedOutput {
+
+  private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class);
+  
+  @Inject
+  public LocalOnFileSorterOutput(
+      @Assisted TezTask task
+      ) throws IOException {
+    super(task);
+  }
+
+  @Override
+  public void close() throws IOException, InterruptedException {
+    LOG.info("XXX close");
+
+    super.close();
+
+
+    TezTaskOutput mapOutputFile = sorter.getMapOutput();
+    FileSystem localFs = FileSystem.getLocal(mapOutputFile.getConf());
+
+    Path src = mapOutputFile.getOutputFile();
+    Path dst = 
+        mapOutputFile.getInputFileForWrite(
+            sorter.getTaskAttemptId().getTaskID(),
+            localFs.getFileStatus(src).getLen());
+
+    localFs.rename(src, dst);
+    LOG.info("XXX renaming src = " + src + ", dst = " + dst);
+  }
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.api.Master;
+import org.apache.tez.api.Output;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.common.sort.impl.ExternalSorter;
+import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter;
+import org.apache.tez.records.OutputContext;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * {@link OnFileSortedOutput} is an {@link Output} which sorts key/value pairs 
+ * written to it and persists it to a file.
+ */
+public class OnFileSortedOutput implements SortingOutput {
+  
+  protected ExternalSorter sorter;
+  
+  @Inject
+  public OnFileSortedOutput(
+      @Assisted TezTask task
+      ) throws IOException {
+    sorter = new DefaultSorter(task);
+  }
+  
+  public void initialize(Configuration conf, Master master) 
+      throws IOException, InterruptedException {
+    sorter.initialize(conf, master);
+  }
+
+  public void setTask(TezTask task) {
+    sorter.setTask(task);
+  }
+  
+  public void write(Object key, Object value) throws IOException,
+      InterruptedException {
+    sorter.write(key, value);
+  }
+
+  public void close() throws IOException, InterruptedException {
+    sorter.flush();
+    sorter.close();
+  }
+
+  @Override
+  public OutputContext getOutputContext() {
+    return null;
+  }
+
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.runtime;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.common.TezTask;
+
+public interface InputFactory {
+  
+  Input create(TezTask task);
+  
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.runtime;
+
+import org.apache.tez.api.Output;
+import org.apache.tez.common.TezTask;
+
+public interface OutputFactory {
+  
+  Output create(TezTask task);
+  
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.runtime;
+
+import org.apache.tez.api.Processor;
+import org.apache.tez.common.TezTask;
+
+public interface ProcessorFactory {
+  
+  Processor create(TezTask task);
+  
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/TaskFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/TaskFactory.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/TaskFactory.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/TaskFactory.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.runtime;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+
+public interface TaskFactory {
+
+  Task create(Input in, Processor processor, Output out);
+
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.runtime;
+
+import org.apache.tez.api.Task;
+import org.apache.tez.common.TezTask;
+
+public interface TezEngineFactory {
+  
+  public Task createTask(TezTask taskContext);
+
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,57 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.runtime;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.common.TezTask;
+
+import com.google.inject.Inject;
+
+public class TezEngineFactoryImpl 
+implements TezEngineFactory {
+
+  private final InputFactory inputFactory;
+  private final ProcessorFactory processorFactory;
+  private final OutputFactory outputFactory;
+  private final TaskFactory taskFactory;
+  
+  @Inject
+  public TezEngineFactoryImpl(
+      InputFactory inputFactory, 
+      ProcessorFactory processorFactory,
+      OutputFactory outputFactory,
+      TaskFactory taskFactory
+      ) {
+    this.inputFactory = inputFactory;
+    this.processorFactory = processorFactory;
+    this.outputFactory = outputFactory;
+    this.taskFactory = taskFactory;
+  }
+  
+  public Task createTask(TezTask taskContext) {
+    Input in = inputFactory.create(taskContext);
+    Output out = outputFactory.create(taskContext);
+    Processor processor = processorFactory.create(taskContext);
+    return taskFactory.create(in, processor, out);
+  }
+  
+}

Added: incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java (added)
+++ incubator/tez/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.task;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Master;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+public class RuntimeTask 
+implements Task {
+
+  private final Input in;
+  private final Output out;
+  private final Processor processor;
+  
+  private Configuration conf;
+  private Master master;
+  
+  @Inject
+  public RuntimeTask(
+      @Assisted Processor processor, 
+      @Assisted Input in, 
+      @Assisted Output out) {
+    this.in = in;
+    this.processor = processor;
+    this.out = out;
+  }
+
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+    this.conf = conf;
+    this.master = master;
+
+    // NOTE: Allow processor to initialize input/output
+    processor.initialize(this.conf, this.master);
+  }
+
+  @Override
+  public Input getInput() {
+    return in;
+  }
+
+  @Override
+  public Processor getProcessor() {
+    return processor;
+  }
+
+  @Override
+  public Output getOutput() {
+    return out;
+  }
+
+  public void run() throws IOException, InterruptedException {
+    processor.process(in, out);
+  }
+
+  public void close() throws IOException, InterruptedException {
+    // NOTE: Allow processor to close input/output
+    processor.close();
+  }
+
+}

Added: incubator/tez/tez-mapreduce/.classpath
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/.classpath?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/.classpath (added)
+++ incubator/tez/tez-mapreduce/.classpath Fri Mar 15 21:26:36 2013
@@ -0,0 +1,104 @@
+<classpath>
+  <classpathentry kind="src" path="src/test/java" output="target/test-classes" including="**/*.java"/>
+  <classpathentry kind="src" path="src/test/resources" output="target/test-classes" excluding="**/*.java"/>
+  <classpathentry kind="src" path="src/main/java" including="**/*.java"/>
+  <classpathentry kind="output" path="target/classes"/>
+  <classpathentry kind="var" path="M2_REPO/javax/activation/activation/1.1/activation-1.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/javax/inject/javax.inject/1/javax.inject-1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar"/>
+  <classpathentry kind="var" path="M2_REPO/javax/servlet/jsp/jsp-api/2.1/jsp-api-2.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar"/>
+  <classpathentry kind="var" path="M2_REPO/aopalliance/aopalliance/1.0/aopalliance-1.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/asm/asm/3.1/asm-3.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/avro/avro/1.5.3/avro-1.5.3.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-cli/commons-cli/1.2/commons-cli-1.2.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-codec/commons-codec/1.4/commons-codec-1.4.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/commons/commons-compress/1.4/commons-compress-1.4.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-digester/commons-digester/1.8/commons-digester-1.8.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-el/commons-el/1.0/commons-el-1.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-io/commons-io/2.1/commons-io-2.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-lang/commons-lang/2.5/commons-lang-2.5.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/commons/commons-math/2.1/commons-math-2.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/commons-net/commons-net/3.1/commons-net-3.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/google/guava/guava/11.0.2/guava-11.0.2.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/google/inject/guice/3.0/guice-3.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/google/inject/extensions/guice-assistedinject/3.0/guice-assistedinject-3.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-annotations/3.0.0-SNAPSHOT/hadoop-annotations-3.0.0-SNAPSHOT.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-auth/3.0.0-SNAPSHOT/hadoop-auth-3.0.0-SNAPSHOT.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-common/3.0.0-SNAPSHOT/hadoop-common-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-common/3.0.0-SNAPSHOT/hadoop-common-3.0.0-SNAPSHOT-sources.jar">
+    <attributes>
+      <attribute value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-common/3.0.0-SNAPSHOT/hadoop-common-3.0.0-SNAPSHOT-javadoc.jar!/" name="javadoc_location"/>
+    </attributes>
+  </classpathentry>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-common/3.0.0-SNAPSHOT/hadoop-mapreduce-client-common-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-common/3.0.0-SNAPSHOT/hadoop-mapreduce-client-common-3.0.0-SNAPSHOT-sources.jar">
+    <attributes>
+      <attribute value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-common/3.0.0-SNAPSHOT/hadoop-mapreduce-client-common-3.0.0-SNAPSHOT-javadoc.jar!/" name="javadoc_location"/>
+    </attributes>
+  </classpathentry>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-core/3.0.0-SNAPSHOT/hadoop-mapreduce-client-core-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-core/3.0.0-SNAPSHOT/hadoop-mapreduce-client-core-3.0.0-SNAPSHOT-sources.jar">
+    <attributes>
+      <attribute value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-core/3.0.0-SNAPSHOT/hadoop-mapreduce-client-core-3.0.0-SNAPSHOT-javadoc.jar!/" name="javadoc_location"/>
+    </attributes>
+  </classpathentry>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-shuffle/3.0.0-SNAPSHOT/hadoop-mapreduce-client-shuffle-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-shuffle/3.0.0-SNAPSHOT/hadoop-mapreduce-client-shuffle-3.0.0-SNAPSHOT-sources.jar">
+    <attributes>
+      <attribute value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-shuffle/3.0.0-SNAPSHOT/hadoop-mapreduce-client-shuffle-3.0.0-SNAPSHOT-javadoc.jar!/" name="javadoc_location"/>
+    </attributes>
+  </classpathentry>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-yarn-api/3.0.0-SNAPSHOT/hadoop-yarn-api-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-yarn-api/3.0.0-SNAPSHOT/hadoop-yarn-api-3.0.0-SNAPSHOT-sources.jar">
+    <attributes>
+      <attribute value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-yarn-api/3.0.0-SNAPSHOT/hadoop-yarn-api-3.0.0-SNAPSHOT-javadoc.jar!/" name="javadoc_location"/>
+    </attributes>
+  </classpathentry>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-yarn-common/3.0.0-SNAPSHOT/hadoop-yarn-common-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-yarn-common/3.0.0-SNAPSHOT/hadoop-yarn-common-3.0.0-SNAPSHOT-sources.jar">
+    <attributes>
+      <attribute value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-yarn-common/3.0.0-SNAPSHOT/hadoop-yarn-common-3.0.0-SNAPSHOT-javadoc.jar!/" name="javadoc_location"/>
+    </attributes>
+  </classpathentry>
+  <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-yarn-server-nodemanager/3.0.0-SNAPSHOT/hadoop-yarn-server-nodemanager-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-yarn-server-nodemanager/3.0.0-SNAPSHOT/hadoop-yarn-server-nodemanager-3.0.0-SNAPSHOT-sources.jar">
+    <attributes>
+      <attribute value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-yarn-server-nodemanager/3.0.0-SNAPSHOT/hadoop-yarn-server-nodemanager-3.0.0-SNAPSHOT-javadoc.jar!/" name="javadoc_location"/>
+    </attributes>
+  </classpathentry>
+  <classpathentry kind="var" path="M2_REPO/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-core-asl/1.8.8/jackson-core-asl-1.8.8.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-jaxrs/1.7.1/jackson-jaxrs-1.7.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-mapper-asl/1.8.8/jackson-mapper-asl-1.8.8.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-xc/1.7.1/jackson-xc-1.7.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/tomcat/jasper-compiler/5.5.23/jasper-compiler-5.5.23.jar"/>
+  <classpathentry kind="var" path="M2_REPO/tomcat/jasper-runtime/5.5.23/jasper-runtime-5.5.23.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/sun/xml/bind/jaxb-impl/2.2.3-1/jaxb-impl-2.2.3-1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/sun/jersey/jersey-core/1.8/jersey-core-1.8.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/sun/jersey/jersey-json/1.8/jersey-json-1.8.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/sun/jersey/jersey-server/1.8/jersey-server-1.8.jar"/>
+  <classpathentry kind="var" path="M2_REPO/net/java/dev/jets3t/jets3t/0.6.1/jets3t-0.6.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/codehaus/jettison/jettison/1.1/jettison-1.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/mortbay/jetty/jetty/6.1.26/jetty-6.1.26.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar"/>
+  <classpathentry kind="var" path="M2_REPO/jline/jline/0.9.94/jline-0.9.94.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/jcraft/jsch/0.1.42/jsch-0.1.42.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar"/>
+  <classpathentry kind="var" path="M2_REPO/junit/junit/4.11/junit-4.11.jar"/>
+  <classpathentry kind="var" path="M2_REPO/log4j/log4j/1.2.17/log4j-1.2.17.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/mockito/mockito-all/1.9.5/mockito-all-1.9.5.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/jboss/netty/netty/3.2.2.Final/netty-3.2.2.Final.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar"/>
+  <classpathentry kind="var" path="M2_REPO/com/google/protobuf/protobuf-java/2.4.0a/protobuf-java-2.4.0a.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/slf4j/slf4j-api/1.6.1/slf4j-api-1.6.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/xerial/snappy/snappy-java/1.0.3.2/snappy-java-1.0.3.2.jar"/>
+  <classpathentry kind="var" path="M2_REPO/stax/stax-api/1.0.1/stax-api-1.0.1.jar"/>
+  <classpathentry kind="src" path="/tez-api"/>
+  <classpathentry kind="src" path="/tez-common"/>
+  <classpathentry kind="src" path="/tez-engine"/>
+  <classpathentry kind="var" path="M2_REPO/xmlenc/xmlenc/0.52/xmlenc-0.52.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/tukaani/xz/1.0/xz-1.0.jar"/>
+  <classpathentry kind="var" path="M2_REPO/org/apache/zookeeper/zookeeper/3.4.2/zookeeper-3.4.2.jar"/>
+  <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+</classpath>
\ No newline at end of file

Added: incubator/tez/tez-mapreduce/.project
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/.project?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/.project (added)
+++ incubator/tez/tez-mapreduce/.project Fri Mar 15 21:26:36 2013
@@ -0,0 +1,17 @@
+<projectDescription>
+  <name>tez-mapreduce</name>
+  <comment>NO_M2ECLIPSE_SUPPORT: Project files created with the maven-eclipse-plugin are not supported in M2Eclipse.</comment>
+  <projects>
+    <project>tez-api</project>
+    <project>tez-common</project>
+    <project>tez-engine</project>
+  </projects>
+  <buildSpec>
+    <buildCommand>
+      <name>org.eclipse.jdt.core.javabuilder</name>
+    </buildCommand>
+  </buildSpec>
+  <natures>
+    <nature>org.eclipse.jdt.core.javanature</nature>
+  </natures>
+</projectDescription>
\ No newline at end of file

Added: incubator/tez/tez-mapreduce/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/pom.xml?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/pom.xml (added)
+++ incubator/tez/tez-mapreduce/pom.xml Fri Mar 15 21:26:36 2013
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.1.0</version>
+  </parent>
+  <artifactId>tez-mapreduce</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-engine</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+    </dependency>
+    <!-- Not really needed here, only needed in the AM. Pulling all MapReduce dependencies in the tez-mapreduce module -->
+    <!-- Needed to figure out shuffle meta information-->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+    </dependency>
+    <!-- Not really needed here, only needed in the AM. Pulling all MapReduce dependencies in the tez-mapreduce module -->
+    <!-- Needed for tez2 api, JobId etc etc. Should be possible to get rid of part of this in the new AM. Later -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.inject.extensions</groupId>
+      <artifactId>guice-assistedinject</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
+@InterfaceAudience.Private
+public class LocalClientProtocolProvider extends ClientProtocolProvider {
+
+  @Override
+  public ClientProtocol create(Configuration conf) throws IOException {
+    String framework =
+        conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
+    if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
+      return null;
+    }
+    String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local");
+    if ("local".equals(tracker)) {
+      conf.setInt("mapreduce.job.maps", 1);
+      return new LocalJobRunner(conf);
+    } else {
+
+      throw new IOException("Invalid \"" + JTConfig.JT_IPC_ADDRESS
+          + "\" configuration value for LocalJobRunner : \""
+          + tracker + "\"");
+    }
+  }
+
+  @Override
+  public ClientProtocol create(InetSocketAddress addr, Configuration conf) {
+    return null; // LocalJobRunner doesn't use a socket
+  }
+
+  @Override
+  public void close(ClientProtocol clientProtocol) {
+    // no clean up required
+  }
+
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.FSDownload;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A helper class for managing the distributed cache for {@link LocalJobRunner}.
+ */
+@SuppressWarnings("deprecation")
+class LocalDistributedCacheManager {
+  public static final Log LOG =
+    LogFactory.getLog(LocalDistributedCacheManager.class);
+  
+  private List<String> localArchives = new ArrayList<String>();
+  private List<String> localFiles = new ArrayList<String>();
+  private List<String> localClasspaths = new ArrayList<String>();
+  
+  private List<File> symlinksCreated = new ArrayList<File>();
+  
+  private boolean setupCalled = false;
+  
+  /**
+   * Set up the distributed cache by localizing the resources, and updating
+   * the configuration with references to the localized resources.
+   * @param conf
+   * @throws IOException
+   */
+  public void setup(JobConf conf) throws IOException {
+    File workDir = new File(System.getProperty("user.dir"));
+    
+    // Generate YARN local resources objects corresponding to the distributed
+    // cache configuration
+    Map<String, LocalResource> localResources = 
+      new LinkedHashMap<String, LocalResource>();
+    MRApps.setupDistributedCache(conf, localResources);
+    
+    // Find which resources are to be put on the local classpath
+    Map<String, Path> classpaths = new HashMap<String, Path>();
+    Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
+    if (archiveClassPaths != null) {
+      for (Path p : archiveClassPaths) {
+        FileSystem remoteFS = p.getFileSystem(conf);
+        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+            remoteFS.getWorkingDirectory()));
+        classpaths.put(p.toUri().getPath().toString(), p);
+      }
+    }
+    Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf);
+    if (fileClassPaths != null) {
+      for (Path p : fileClassPaths) {
+        FileSystem remoteFS = p.getFileSystem(conf);
+        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+            remoteFS.getWorkingDirectory()));
+        classpaths.put(p.toUri().getPath().toString(), p);
+      }
+    }
+    
+    // Localize the resources
+    LocalDirAllocator localDirAllocator =
+      new LocalDirAllocator(MRConfig.LOCAL_DIR);
+    FileContext localFSFileContext = FileContext.getLocalFSFileContext();
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    
+    ExecutorService exec = null;
+    try {
+      ThreadFactory tf = new ThreadFactoryBuilder()
+      .setNameFormat("LocalDistributedCacheManager Downloader #%d")
+      .build();
+      exec = Executors.newCachedThreadPool(tf);
+      Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
+      Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
+      for (LocalResource resource : localResources.values()) {
+        Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
+            destPath, resource, new Random());
+        Future<Path> future = exec.submit(download);
+        resourcesToPaths.put(resource, future);
+      }
+      for (Entry<String, LocalResource> entry : localResources.entrySet()) {
+        LocalResource resource = entry.getValue();
+        Path path;
+        try {
+          path = resourcesToPaths.get(resource).get();
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        } catch (ExecutionException e) {
+          throw new IOException(e);
+        }
+        String pathString = path.toUri().toString();
+        String link = entry.getKey();
+        String target = new File(path.toUri()).getPath();
+        symlink(workDir, target, link);
+        
+        if (resource.getType() == LocalResourceType.ARCHIVE) {
+          localArchives.add(pathString);
+        } else if (resource.getType() == LocalResourceType.FILE) {
+          localFiles.add(pathString);
+        } else if (resource.getType() == LocalResourceType.PATTERN) {
+          //PATTERN is not currently used in local mode
+          throw new IllegalArgumentException("Resource type PATTERN is not " +
+          		"implemented yet. " + resource.getResource());
+        }
+        Path resourcePath;
+        try {
+          resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
+        } catch (URISyntaxException e) {
+          throw new IOException(e);
+        }
+        LOG.info(String.format("Localized %s as %s", resourcePath, path));
+        String cp = resourcePath.toUri().getPath();
+        if (classpaths.keySet().contains(cp)) {
+          localClasspaths.add(path.toUri().getPath().toString());
+        }
+      }
+    } finally {
+      if (exec != null) {
+        exec.shutdown();
+      }
+    }    
+    // Update the configuration object with localized data.
+    if (!localArchives.isEmpty()) {
+      conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
+          .arrayToString(localArchives.toArray(new String[localArchives
+              .size()])));
+    }
+    if (!localFiles.isEmpty()) {
+      conf.set(MRJobConfig.CACHE_LOCALFILES, StringUtils
+          .arrayToString(localFiles.toArray(new String[localArchives
+              .size()])));
+    }
+    setupCalled = true;
+  }
+  
+  /**
+   * Utility method for creating a symlink and warning on errors.
+   *
+   * If link is null, does nothing.
+   */
+  private void symlink(File workDir, String target, String link)
+      throws IOException {
+    if (link != null) {
+      link = workDir.toString() + Path.SEPARATOR + link;
+      File flink = new File(link);
+      if (!flink.exists()) {
+        LOG.info(String.format("Creating symlink: %s <- %s", target, link));
+        if (0 != FileUtil.symLink(target, link)) {
+          LOG.warn(String.format("Failed to create symlink: %s <- %s", target,
+              link));
+        } else {
+          symlinksCreated.add(new File(link));
+        }
+      }
+    }
+  }
+  
+  /** 
+   * Are the resources that should be added to the classpath? 
+   * Should be called after setup().
+   * 
+   */
+  public boolean hasLocalClasspaths() {
+    if (!setupCalled) {
+      throw new IllegalStateException(
+          "hasLocalClasspaths() should be called after setup()");
+    }
+    return !localClasspaths.isEmpty();
+  }
+  
+  /**
+   * Creates a class loader that includes the designated
+   * files and archives.
+   */
+  public ClassLoader makeClassLoader(final ClassLoader parent)
+      throws MalformedURLException {
+    final URL[] urls = new URL[localClasspaths.size()];
+    for (int i = 0; i < localClasspaths.size(); ++i) {
+      urls[i] = new File(localClasspaths.get(i)).toURI().toURL();
+      LOG.info(urls[i]);
+    }
+    return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
+      @Override
+      public ClassLoader run() {
+        return new URLClassLoader(urls, parent);
+      }
+    });
+  }
+
+  public void close() throws IOException {
+    for (File symlink : symlinksCreated) {
+      if (!symlink.delete()) {
+        LOG.warn("Failed to delete symlink created by the local job runner: " +
+            symlink);
+      }
+    }
+    FileContext localFSFileContext = FileContext.getLocalFSFileContext();
+    for (String archive : localArchives) {
+      localFSFileContext.delete(new Path(archive), true);
+    }
+    for (String file : localFiles) {
+      localFSFileContext.delete(new Path(file), true);
+    }
+  }
+}