You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/04/09 03:28:06 UTC

svn commit: r1465852 [14/15] - in /hama/trunk: ./ bin/ c++/ c++/pipes/ c++/pipes/api/ c++/pipes/api/hama/ c++/pipes/debug/ c++/pipes/impl/ c++/utils/ c++/utils/api/ c++/utils/api/hadoop/ c++/utils/impl/ c++/utils/m4/ core/src/main/java/org/apache/hama/...

Added: hama/trunk/c++/utils/m4/hadoop_utils.m4
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/utils/m4/hadoop_utils.m4?rev=1465852&view=auto
==============================================================================
--- hama/trunk/c++/utils/m4/hadoop_utils.m4 (added)
+++ hama/trunk/c++/utils/m4/hadoop_utils.m4 Tue Apr  9 01:28:04 2013
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# hadoop_utils.m4
+
+# Check to see if the install program supports -C
+# If so, use "install -C" for the headers. Otherwise, every install
+# updates the timestamps on the installed headers, which causes a recompilation
+# of any downstream libraries.
+AC_DEFUN([CHECK_INSTALL_CFLAG],[
+AC_REQUIRE([AC_PROG_INSTALL])
+touch foo
+if $INSTALL -C foo bar; then
+  INSTALL_DATA="$INSTALL_DATA -C"
+fi
+rm -f foo bar
+])
+
+# Set up the things we need for compiling hadoop utils
+AC_DEFUN([HADOOP_UTILS_SETUP],[
+AC_REQUIRE([AC_GNU_SOURCE])
+AC_REQUIRE([AC_SYS_LARGEFILE])
+])
+
+# define a macro for using hadoop utils
+AC_DEFUN([USE_HADOOP_UTILS],[
+AC_REQUIRE([HADOOP_UTILS_SETUP])
+AC_ARG_WITH([hadoop-utils],
+            AS_HELP_STRING([--with-hadoop-utils=<dir>],
+                           [directory to get hadoop_utils from]),
+            [HADOOP_UTILS_PREFIX="$withval"],
+            [HADOOP_UTILS_PREFIX="\${prefix}"])
+AC_SUBST(HADOOP_UTILS_PREFIX)
+])
+
+AC_DEFUN([HADOOP_PIPES_SETUP],[
+AC_CHECK_HEADERS([pthread.h], [], 
+  AC_MSG_ERROR(Please check if you have installed the pthread library)) 
+AC_CHECK_LIB([pthread], [pthread_create], [], 
+  AC_MSG_ERROR(Cannot find libpthread.so, please check))
+AC_CHECK_LIB([ssl], [HMAC_Init], [], 
+  AC_MSG_ERROR(Cannot find libssl.so, please check))
+])
+
+# define a macro for using hadoop pipes
+AC_DEFUN([USE_HADOOP_PIPES],[
+AC_REQUIRE([USE_HADOOP_UTILS])
+AC_REQUIRE([HADOOP_PIPES_SETUP])
+AC_ARG_WITH([hadoop-pipes],
+            AS_HELP_STRING([--with-hadoop-pipes=<dir>],
+                           [directory to get hadoop pipes from]),
+            [HADOOP_PIPES_PREFIX="$withval"],
+            [HADOOP_PIPES_PREFIX="\${prefix}"])
+AC_SUBST(HADOOP_PIPES_PREFIX)
+])

Added: hama/trunk/c++/utils/missing
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/utils/missing?rev=1465852&view=auto
==============================================================================
--- hama/trunk/c++/utils/missing (added)
+++ hama/trunk/c++/utils/missing Tue Apr  9 01:28:04 2013
@@ -0,0 +1,360 @@
+#! /bin/sh
+# Common stub for a few missing GNU programs while installing.
+
+scriptversion=2005-06-08.21
+
+# Copyright (C) 1996, 1997, 1999, 2000, 2002, 2003, 2004, 2005
+#   Free Software Foundation, Inc.
+# Originally by Fran,cois Pinard <pi...@iro.umontreal.ca>, 1996.
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+# As a special exception to the GNU General Public License, if you
+# distribute this file as part of a program that contains a
+# configuration script generated by Autoconf, you may include it under
+# the same distribution terms that you use for the rest of that program.
+
+if test $# -eq 0; then
+  echo 1>&2 "Try \`$0 --help' for more information"
+  exit 1
+fi
+
+run=:
+
+# In the cases where this matters, `missing' is being run in the
+# srcdir already.
+if test -f configure.ac; then
+  configure_ac=configure.ac
+else
+  configure_ac=configure.in
+fi
+
+msg="missing on your system"
+
+case "$1" in
+--run)
+  # Try to run requested program, and just exit if it succeeds.
+  run=
+  shift
+  "$@" && exit 0
+  # Exit code 63 means version mismatch.  This often happens
+  # when the user try to use an ancient version of a tool on
+  # a file that requires a minimum version.  In this case we
+  # we should proceed has if the program had been absent, or
+  # if --run hadn't been passed.
+  if test $? = 63; then
+    run=:
+    msg="probably too old"
+  fi
+  ;;
+
+  -h|--h|--he|--hel|--help)
+    echo "\
+$0 [OPTION]... PROGRAM [ARGUMENT]...
+
+Handle \`PROGRAM [ARGUMENT]...' for when PROGRAM is missing, or return an
+error status if there is no known handling for PROGRAM.
+
+Options:
+  -h, --help      display this help and exit
+  -v, --version   output version information and exit
+  --run           try to run the given command, and emulate it if it fails
+
+Supported PROGRAM values:
+  aclocal      touch file \`aclocal.m4'
+  autoconf     touch file \`configure'
+  autoheader   touch file \`config.h.in'
+  automake     touch all \`Makefile.in' files
+  bison        create \`y.tab.[ch]', if possible, from existing .[ch]
+  flex         create \`lex.yy.c', if possible, from existing .c
+  help2man     touch the output file
+  lex          create \`lex.yy.c', if possible, from existing .c
+  makeinfo     touch the output file
+  tar          try tar, gnutar, gtar, then tar without non-portable flags
+  yacc         create \`y.tab.[ch]', if possible, from existing .[ch]
+
+Send bug reports to <bu...@gnu.org>."
+    exit $?
+    ;;
+
+  -v|--v|--ve|--ver|--vers|--versi|--versio|--version)
+    echo "missing $scriptversion (GNU Automake)"
+    exit $?
+    ;;
+
+  -*)
+    echo 1>&2 "$0: Unknown \`$1' option"
+    echo 1>&2 "Try \`$0 --help' for more information"
+    exit 1
+    ;;
+
+esac
+
+# Now exit if we have it, but it failed.  Also exit now if we
+# don't have it and --version was passed (most likely to detect
+# the program).
+case "$1" in
+  lex|yacc)
+    # Not GNU programs, they don't have --version.
+    ;;
+
+  tar)
+    if test -n "$run"; then
+       echo 1>&2 "ERROR: \`tar' requires --run"
+       exit 1
+    elif test "x$2" = "x--version" || test "x$2" = "x--help"; then
+       exit 1
+    fi
+    ;;
+
+  *)
+    if test -z "$run" && ($1 --version) > /dev/null 2>&1; then
+       # We have it, but it failed.
+       exit 1
+    elif test "x$2" = "x--version" || test "x$2" = "x--help"; then
+       # Could not run --version or --help.  This is probably someone
+       # running `$TOOL --version' or `$TOOL --help' to check whether
+       # $TOOL exists and not knowing $TOOL uses missing.
+       exit 1
+    fi
+    ;;
+esac
+
+# If it does not exist, or fails to run (possibly an outdated version),
+# try to emulate it.
+case "$1" in
+  aclocal*)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified \`acinclude.m4' or \`${configure_ac}'.  You might want
+         to install the \`Automake' and \`Perl' packages.  Grab them from
+         any GNU archive site."
+    touch aclocal.m4
+    ;;
+
+  autoconf)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified \`${configure_ac}'.  You might want to install the
+         \`Autoconf' and \`GNU m4' packages.  Grab them from any GNU
+         archive site."
+    touch configure
+    ;;
+
+  autoheader)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified \`acconfig.h' or \`${configure_ac}'.  You might want
+         to install the \`Autoconf' and \`GNU m4' packages.  Grab them
+         from any GNU archive site."
+    files=`sed -n 's/^[ ]*A[CM]_CONFIG_HEADER(\([^)]*\)).*/\1/p' ${configure_ac}`
+    test -z "$files" && files="config.h"
+    touch_files=
+    for f in $files; do
+      case "$f" in
+      *:*) touch_files="$touch_files "`echo "$f" |
+				       sed -e 's/^[^:]*://' -e 's/:.*//'`;;
+      *) touch_files="$touch_files $f.in";;
+      esac
+    done
+    touch $touch_files
+    ;;
+
+  automake*)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified \`Makefile.am', \`acinclude.m4' or \`${configure_ac}'.
+         You might want to install the \`Automake' and \`Perl' packages.
+         Grab them from any GNU archive site."
+    find . -type f -name Makefile.am -print |
+	   sed 's/\.am$/.in/' |
+	   while read f; do touch "$f"; done
+    ;;
+
+  autom4te)
+    echo 1>&2 "\
+WARNING: \`$1' is needed, but is $msg.
+         You might have modified some files without having the
+         proper tools for further handling them.
+         You can get \`$1' as part of \`Autoconf' from any GNU
+         archive site."
+
+    file=`echo "$*" | sed -n 's/.*--output[ =]*\([^ ]*\).*/\1/p'`
+    test -z "$file" && file=`echo "$*" | sed -n 's/.*-o[ ]*\([^ ]*\).*/\1/p'`
+    if test -f "$file"; then
+	touch $file
+    else
+	test -z "$file" || exec >$file
+	echo "#! /bin/sh"
+	echo "# Created by GNU Automake missing as a replacement of"
+	echo "#  $ $@"
+	echo "exit 0"
+	chmod +x $file
+	exit 1
+    fi
+    ;;
+
+  bison|yacc)
+    echo 1>&2 "\
+WARNING: \`$1' $msg.  You should only need it if
+         you modified a \`.y' file.  You may need the \`Bison' package
+         in order for those modifications to take effect.  You can get
+         \`Bison' from any GNU archive site."
+    rm -f y.tab.c y.tab.h
+    if [ $# -ne 1 ]; then
+        eval LASTARG="\${$#}"
+	case "$LASTARG" in
+	*.y)
+	    SRCFILE=`echo "$LASTARG" | sed 's/y$/c/'`
+	    if [ -f "$SRCFILE" ]; then
+	         cp "$SRCFILE" y.tab.c
+	    fi
+	    SRCFILE=`echo "$LASTARG" | sed 's/y$/h/'`
+	    if [ -f "$SRCFILE" ]; then
+	         cp "$SRCFILE" y.tab.h
+	    fi
+	  ;;
+	esac
+    fi
+    if [ ! -f y.tab.h ]; then
+	echo >y.tab.h
+    fi
+    if [ ! -f y.tab.c ]; then
+	echo 'main() { return 0; }' >y.tab.c
+    fi
+    ;;
+
+  lex|flex)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified a \`.l' file.  You may need the \`Flex' package
+         in order for those modifications to take effect.  You can get
+         \`Flex' from any GNU archive site."
+    rm -f lex.yy.c
+    if [ $# -ne 1 ]; then
+        eval LASTARG="\${$#}"
+	case "$LASTARG" in
+	*.l)
+	    SRCFILE=`echo "$LASTARG" | sed 's/l$/c/'`
+	    if [ -f "$SRCFILE" ]; then
+	         cp "$SRCFILE" lex.yy.c
+	    fi
+	  ;;
+	esac
+    fi
+    if [ ! -f lex.yy.c ]; then
+	echo 'main() { return 0; }' >lex.yy.c
+    fi
+    ;;
+
+  help2man)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+	 you modified a dependency of a manual page.  You may need the
+	 \`Help2man' package in order for those modifications to take
+	 effect.  You can get \`Help2man' from any GNU archive site."
+
+    file=`echo "$*" | sed -n 's/.*-o \([^ ]*\).*/\1/p'`
+    if test -z "$file"; then
+	file=`echo "$*" | sed -n 's/.*--output=\([^ ]*\).*/\1/p'`
+    fi
+    if [ -f "$file" ]; then
+	touch $file
+    else
+	test -z "$file" || exec >$file
+	echo ".ab help2man is required to generate this page"
+	exit 1
+    fi
+    ;;
+
+  makeinfo)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified a \`.texi' or \`.texinfo' file, or any other file
+         indirectly affecting the aspect of the manual.  The spurious
+         call might also be the consequence of using a buggy \`make' (AIX,
+         DU, IRIX).  You might want to install the \`Texinfo' package or
+         the \`GNU make' package.  Grab either from any GNU archive site."
+    # The file to touch is that specified with -o ...
+    file=`echo "$*" | sed -n 's/.*-o \([^ ]*\).*/\1/p'`
+    if test -z "$file"; then
+      # ... or it is the one specified with @setfilename ...
+      infile=`echo "$*" | sed 's/.* \([^ ]*\) *$/\1/'`
+      file=`sed -n '/^@setfilename/ { s/.* \([^ ]*\) *$/\1/; p; q; }' $infile`
+      # ... or it is derived from the source name (dir/f.texi becomes f.info)
+      test -z "$file" && file=`echo "$infile" | sed 's,.*/,,;s,.[^.]*$,,'`.info
+    fi
+    # If the file does not exist, the user really needs makeinfo;
+    # let's fail without touching anything.
+    test -f $file || exit 1
+    touch $file
+    ;;
+
+  tar)
+    shift
+
+    # We have already tried tar in the generic part.
+    # Look for gnutar/gtar before invocation to avoid ugly error
+    # messages.
+    if (gnutar --version > /dev/null 2>&1); then
+       gnutar "$@" && exit 0
+    fi
+    if (gtar --version > /dev/null 2>&1); then
+       gtar "$@" && exit 0
+    fi
+    firstarg="$1"
+    if shift; then
+	case "$firstarg" in
+	*o*)
+	    firstarg=`echo "$firstarg" | sed s/o//`
+	    tar "$firstarg" "$@" && exit 0
+	    ;;
+	esac
+	case "$firstarg" in
+	*h*)
+	    firstarg=`echo "$firstarg" | sed s/h//`
+	    tar "$firstarg" "$@" && exit 0
+	    ;;
+	esac
+    fi
+
+    echo 1>&2 "\
+WARNING: I can't seem to be able to run \`tar' with the given arguments.
+         You may want to install GNU tar or Free paxutils, or check the
+         command line arguments."
+    exit 1
+    ;;
+
+  *)
+    echo 1>&2 "\
+WARNING: \`$1' is needed, and is $msg.
+         You might have modified some files without having the
+         proper tools for further handling them.  Check the \`README' file,
+         it often tells you about the needed prerequisites for installing
+         this package.  You may also peek at any GNU archive site, in case
+         some other package would contain this missing \`$1' program."
+    exit 1
+    ;;
+esac
+
+exit 0
+
+# Local variables:
+# eval: (add-hook 'write-file-hooks 'time-stamp)
+# time-stamp-start: "scriptversion="
+# time-stamp-format: "%:y-%02m-%02d.%02H"
+# time-stamp-end: "$"
+# End:

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Tue Apr  9 01:28:04 2013
@@ -23,6 +23,7 @@ import java.net.URLDecoder;
 import java.util.Enumeration;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -31,6 +32,13 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
+import org.apache.hama.pipes.PipesApplicable;
+import org.apache.hama.pipes.PipesApplication;
+import org.apache.hama.pipes.PipesPartitioner;
+import org.apache.hama.pipes.util.DistributedCacheUtil;
+
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
 
 /**
  * A BSP job configuration.
@@ -47,6 +55,9 @@ public class BSPJob extends BSPJobContex
   private BSPJobClient jobClient;
   private RunningJob info;
 
+  private PipesApplication<?, ?, ?, ?, ?> pipesApp = null;
+  private static final Log LOG = LogFactory.getLog(BSPJob.class);
+
   public BSPJob() throws IOException {
     this(new HamaConfiguration());
   }
@@ -241,6 +252,31 @@ public class BSPJob extends BSPJobContex
     conf.set(name, value);
   }
 
+  public void setBoolean(String name, boolean value) {
+    conf.setBoolean(name, value);
+  }
+
+  public boolean getBoolean(String name, boolean defaultValue) {
+    return conf.getBoolean(name, defaultValue);
+  }
+
+  public final <K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable> PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> getPipesApplication() {
+    if (pipesApp == null)
+      pipesApp = new PipesApplication<K1, V1, K2, V2, BytesWritable>();
+
+    return pipesApp;
+  }
+
+  public void cleanup() {
+    try {
+      // Close client pipesApplication
+      if (this.getPipesApplication() != null)
+        this.getPipesApplication().cleanup();
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
+
   public void setNumBspTask(int tasks) {
     conf.setInt("bsp.peers.num", tasks);
   }
@@ -381,9 +417,32 @@ public class BSPJob extends BSPJobContex
 
   @SuppressWarnings("rawtypes")
   public Partitioner getPartitioner() {
-    return ReflectionUtils.newInstance(conf.getClass(
+
+    Class<? extends Partitioner> partitionerClass = conf.getClass(
         Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
-        Partitioner.class), conf);
+        Partitioner.class);
+
+    LOG.info("DEBUG: " + Constants.RUNTIME_PARTITIONING_CLASS + ": "
+        + partitionerClass.toString());
+
+    Partitioner partitioner = ReflectionUtils.newInstance(partitionerClass,
+        conf);
+
+    /* PipesPartitioner usage */
+    if (PipesPartitioner.class.equals(partitionerClass)) {
+      ((PipesApplicable) partitioner)
+          .setApplication(this.getPipesApplication());
+
+      try {
+        DistributedCacheUtil.moveLocalFiles(conf);
+        this.getPipesApplication().start(conf);
+      } catch (IOException e) {
+        LOG.error(e);
+      } catch (InterruptedException e) {
+        LOG.error(e);
+      }
+    }
+    return partitioner;
   }
 
   @SuppressWarnings("rawtypes")

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Tue Apr  9 01:28:04 2013
@@ -769,6 +769,9 @@ public class BSPJobClient extends Config
     // TODO if error found, kill job
     // running.killJob();
     jc.close();
+
+    // Added cleanup for Client PipesApp and DistributedCache
+    job.cleanup();
   }
 
   /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Apr  9 01:28:04 2013
@@ -19,7 +19,8 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
@@ -28,7 +29,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -47,6 +47,7 @@ import org.apache.hama.bsp.sync.PeerSync
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.pipes.util.DistributedCacheUtil;
 import org.apache.hama.util.KeyValuePair;
 
 /**
@@ -242,45 +243,6 @@ public final class BSPPeerImpl<K1, V1, K
     }
   }
 
-  /**
-   * Transfers DistributedCache files into the local cache files. Also creates
-   * symbolic links for URIs specified with a fragment if
-   * DistributedCache.getSymlinks() is true.
-   * 
-   * @throws IOException If a DistributedCache file cannot be found.
-   */
-  public final void moveCacheFiles() throws IOException {
-    StringBuilder files = new StringBuilder();
-    boolean first = true;
-    if (DistributedCache.getCacheFiles(conf) != null) {
-      for (URI uri : DistributedCache.getCacheFiles(conf)) {
-        if (uri != null) {
-          if (!first) {
-            files.append(",");
-          }
-          if (null != uri.getFragment() && DistributedCache.getSymlink(conf)) {
-
-            FileUtil.symLink(uri.getPath(), uri.getFragment());
-            files.append(uri.getFragment()).append(",");
-          }
-          FileSystem hdfs = FileSystem.get(conf);
-          Path pathSrc = new Path(uri.getPath());
-          if (hdfs.exists(pathSrc)) {
-            LocalFileSystem local = FileSystem.getLocal(conf);
-            Path pathDst = new Path(local.getWorkingDirectory(),
-                pathSrc.getName());
-            hdfs.copyToLocalFile(pathSrc, pathDst);
-            files.append(pathDst.toUri().getPath());
-          }
-          first = false;
-        }
-      }
-    }
-    if (files.length() > 0) {
-      DistributedCache.addLocalFiles(conf, files.toString());
-    }
-  }
-
   @SuppressWarnings("unchecked")
   public final void initInput() throws IOException {
     InputSplit inputSplit = null;
@@ -370,12 +332,21 @@ public final class BSPPeerImpl<K1, V1, K
       }
     };
 
+    /* Move Files to HDFS */
     try {
-      moveCacheFiles();
+      DistributedCacheUtil.moveLocalFiles(this.conf);
     } catch (Exception e) {
       LOG.error(e);
     }
 
+    /* Add additional jars to Classpath */
+    // LOG.info("conf.get(tmpjars): " + this.conf.get("tmpjars"));
+    URL[] libjars = DistributedCacheUtil.addJarsToJobClasspath(this.conf);
+
+    // ATTENTION bspJob.getConf() != this.conf
+    if (libjars != null)
+      bspJob.conf.setClassLoader(new URLClassLoader(libjars, bspJob.conf
+          .getClassLoader()));
   }
 
   @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Tue Apr  9 01:28:04 2013
@@ -33,6 +33,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.pipes.PipesApplicable;
+import org.apache.hama.pipes.PipesBSP;
 
 /**
  * Base class for tasks.
@@ -156,10 +158,17 @@ public final class BSPTask extends Task 
       final BytesWritable rawSplit, final BSPPeerProtocol umbilical)
       throws Exception {
 
+    Class<?> workClass = job.getConfiguration().getClass("bsp.work.class",
+        BSP.class);
+
     BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M>) ReflectionUtils
-        .newInstance(
-            job.getConfiguration().getClass("bsp.work.class", BSP.class),
-            job.getConfiguration());
+        .newInstance(workClass, job.getConfiguration());
+
+    LOG.debug("bsp.work.class: " + workClass.toString());
+
+    /* Setup PipesApplication if workClass is matching */
+    if (PipesBSP.class.equals(workClass))
+      ((PipesApplicable) bsp).setApplication(job.getPipesApplication());
 
     // The policy is to throw the first exception and log the remaining.
     Exception firstException = null;

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueLineRecordReader.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueLineRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueLineRecordReader.java Tue Apr  9 01:28:04 2013
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This class treats a line in the input as a key/value pair separated by a
+ * separator character. The separator can be specified in config file under the
+ * attribute name key.value.separator.in.input.line. The default separator is
+ * the tab character ('\t').
+ */
+public class KeyValueLineRecordReader implements RecordReader<Text, Text> {
+
+  private final LineRecordReader lineRecordReader;
+
+  private byte separator = (byte) '\t';
+
+  private LongWritable dummyKey;
+
+  private Text innerValue;
+
+  @SuppressWarnings("rawtypes")
+  public Class getKeyClass() {
+    return Text.class;
+  }
+
+  public Text createKey() {
+    return new Text();
+  }
+
+  public Text createValue() {
+    return new Text();
+  }
+
+  public KeyValueLineRecordReader(Configuration job, FileSplit split)
+      throws IOException {
+
+    lineRecordReader = new LineRecordReader(job, split);
+    dummyKey = lineRecordReader.createKey();
+    innerValue = lineRecordReader.createValue();
+    String sepStr = job.get("key.value.separator.in.input.line", "\t");
+    this.separator = (byte) sepStr.charAt(0);
+  }
+
+  public static int findSeparator(byte[] utf, int start, int length, byte sep) {
+    for (int i = start; i < (start + length); i++) {
+      if (utf[i] == sep) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  /** Read key/value pair in a line. */
+  public synchronized boolean next(Text key, Text value) throws IOException {
+    Text tKey = key;
+    Text tValue = value;
+    byte[] line = null;
+    int lineLen = -1;
+    if (lineRecordReader.next(dummyKey, innerValue)) {
+      line = innerValue.getBytes();
+      lineLen = innerValue.getLength();
+    } else {
+      return false;
+    }
+    if (line == null)
+      return false;
+    int pos = findSeparator(line, 0, lineLen, this.separator);
+    if (pos == -1) {
+      tKey.set(line, 0, lineLen);
+      tValue.set("");
+    } else {
+      int keyLen = pos;
+      byte[] keyBytes = new byte[keyLen];
+      System.arraycopy(line, 0, keyBytes, 0, keyLen);
+      int valLen = lineLen - keyLen - 1;
+      byte[] valBytes = new byte[valLen];
+      System.arraycopy(line, pos + 1, valBytes, 0, valLen);
+      tKey.set(keyBytes);
+      tValue.set(valBytes);
+    }
+    return true;
+  }
+
+  public float getProgress() {
+    return lineRecordReader.getProgress();
+  }
+
+  public synchronized long getPos() throws IOException {
+    return lineRecordReader.getPos();
+  }
+
+  public synchronized void close() throws IOException {
+    lineRecordReader.close();
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueTextInputFormat.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueTextInputFormat.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueTextInputFormat.java Tue Apr  9 01:28:04 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * An {@link InputFormat} for plain text files. Files are broken into lines.
+ * Either linefeed or carriage-return are used to signal end of line. Each line
+ * is divided into key and value parts by a separator byte. If no such a byte
+ * exists, the key will be the entire line and value will be empty.
+ */
+public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
+
+  @Override
+  public RecordReader<Text, Text> getRecordReader(InputSplit genericSplit,
+      BSPJob job) throws IOException {
+    return new KeyValueLineRecordReader(job.conf, (FileSplit) genericSplit);
+  }
+
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Tue Apr  9 01:28:04 2013
@@ -22,7 +22,9 @@ import java.io.FileFilter;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -46,10 +48,26 @@ public class TaskLog {
     }
   }
 
+  /*
+   * Get LogFile by taskid and distinguish between log and error extension
+   */
   public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
     // TODO clean up the log path and type.
     return new File(LOG_DIR, taskid.getJobID() + "/" + taskid.toString()
-        + ".log");
+        + ((filter == LogName.STDERR) ? ".err" : ".log"));
+  }
+
+  /*
+   * Get LogFile by stringPattern and distinguish between log and error
+   * extension
+   */
+  public static File getLocalTaskLogFile(LogName filter, String stringPattern) {
+    // TODO clean up the log path and type.
+    SimpleDateFormat sdf = new SimpleDateFormat();
+    sdf.applyPattern(stringPattern);
+    return new File(LOG_DIR, "job_" + sdf.format(new Date()) + "/" + "local_"
+        + sdf.format(new Date())
+        + ((filter == LogName.STDERR) ? ".err" : ".log"));
   }
 
   /**

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplicable.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplicable.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplicable.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplicable.java Tue Apr  9 01:28:04 2013
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.pipes;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This interface is implemented in Classes which should have access to the
+ * PipesApplication object. e.g., PipesBSP and PipesPartitioner
+ */
+public interface PipesApplicable {
+
+  public void setApplication(
+      PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> pipesApp);
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java Tue Apr  9 01:28:04 2013
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** MODIFIED FOR GPGPU Usage! **/
+
+package org.apache.hama.pipes;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskLog;
+import org.apache.hama.pipes.protocol.BinaryProtocol;
+import org.apache.hama.pipes.protocol.DownwardProtocol;
+import org.apache.hama.pipes.protocol.StreamingProtocol;
+
+/**
+ * This class is responsible for launching and communicating with the child
+ * process.
+ * 
+ * Adapted from Hadoop Pipes.
+ * 
+ */
+public class PipesApplication<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable, M extends Writable> {
+
+  private static final Log LOG = LogFactory.getLog(PipesApplication.class
+      .getName());
+  private ServerSocket serverSocket;
+  private Process process;
+  private Socket clientSocket;
+
+  private DownwardProtocol<K1, V1, K2, V2> downlink;
+  private boolean streamingEnabled = false;
+
+  static final boolean WINDOWS = System.getProperty("os.name").startsWith(
+      "Windows");
+
+  public PipesApplication() {
+  }
+
+  /* Build Environment based on the Configuration */
+  private Map<String, String> setupEnvironment(Configuration conf)
+      throws IOException {
+
+    Map<String, String> env = new HashMap<String, String>();
+
+    this.streamingEnabled = conf.getBoolean("hama.streaming.enabled", false);
+
+    if (!this.streamingEnabled) {
+      serverSocket = new ServerSocket(0);
+      env.put("hama.pipes.command.port",
+          Integer.toString(serverSocket.getLocalPort()));
+    }
+
+    // add TMPDIR environment variable with the value of java.io.tmpdir
+    env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
+
+    /* Set Logging Environment from Configuration */
+    env.put("hama.pipes.logging",
+        conf.getBoolean("hama.pipes.logging", false) ? "1" : "0");
+    LOG.debug("DEBUG hama.pipes.logging: "
+        + conf.getBoolean("hama.pipes.logging", false));
+
+    return env;
+  }
+
+  /* Build a Command String based on the Configuration */
+  private List<String> setupCommand(Configuration conf) throws IOException,
+      InterruptedException {
+
+    List<String> cmd = new ArrayList<String>();
+    String interpretor = conf.get("hama.pipes.executable.interpretor");
+    if (interpretor != null) {
+      cmd.add(interpretor);
+    }
+
+    String executable = null;
+    try {
+      if (DistributedCache.getLocalCacheFiles(conf) != null) {
+        LOG.debug("DEBUG LocalCacheFilesCount: "
+            + DistributedCache.getLocalCacheFiles(conf).length);
+        for (Path u : DistributedCache.getLocalCacheFiles(conf))
+          LOG.debug("DEBUG LocalCacheFiles: " + u);
+
+        executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
+
+        LOG.debug("DEBUG: executable: " + executable);
+      }
+    } catch (Exception e) {
+      LOG.error("Executable: " + executable + " fs.default.name: "
+          + conf.get("fs.default.name"));
+
+      throw new IOException("Executable is missing!");
+    }
+
+    if (!new File(executable).canExecute()) {
+      // LinuxTaskController sets +x permissions on all distcache files already.
+      // In case of DefaultTaskController, set permissions here.
+      FileUtil.chmod(executable, "u+x");
+    }
+
+    cmd.add(executable);
+
+    String additionalArgs = conf.get("hama.pipes.executable.args");
+    // if true, we are resolving filenames with the linked paths in
+    // DistributedCache
+    boolean resolveArguments = conf.getBoolean(
+        "hama.pipes.resolve.executable.args", false);
+    if (additionalArgs != null && !additionalArgs.isEmpty()) {
+      String[] split = additionalArgs.split(" ");
+      for (String s : split) {
+        if (resolveArguments) {
+          for (Path u : DistributedCache.getLocalCacheFiles(conf)) {
+            if (u.getName().equals(s)) {
+              LOG.info("Resolved argument \"" + s
+                  + "\" with fully qualified path \"" + u.toString() + "\"!");
+              cmd.add(u.toString());
+              break;
+            }
+          }
+        } else {
+          cmd.add(s);
+        }
+      }
+    }
+
+    return cmd;
+  }
+
+  /* If Parent File/Folder doesn't exist, build one. */
+  private void checkParentFile(File file) {
+    if (!file.getParentFile().exists()) {
+      file.getParentFile().mkdirs();
+      LOG.debug("File: " + file.getParentFile().getAbsolutePath() + " created!");
+    }
+  }
+
+  /**
+   * Start the child process to handle the task for us. Peer is not available
+   * now, start child by using only the configuration! e.g., PipesPartitioner,
+   * no peer is available at this time!
+   * 
+   * @param conf task's configuration
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public void start(Configuration conf) throws IOException,
+      InterruptedException {
+
+    Map<String, String> env = setupEnvironment(conf);
+    List<String> cmd = setupCommand(conf);
+
+    // wrap the command in a stdout/stderr capture
+    File stdout = TaskLog.getLocalTaskLogFile(TaskLog.LogName.STDOUT,
+        "yyyyMMdd'_partitioner_'HHmmss");
+    File stderr = TaskLog.getLocalTaskLogFile(TaskLog.LogName.STDERR,
+        "yyyyMMdd'_partitioner_'HHmmss");
+
+    // Get the desired maximum length of task's logs.
+    long logLength = TaskLog.getTaskLogLength(conf);
+
+    if (!streamingEnabled) {
+      cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength);
+    } else {
+      // use tee in streaming to get the output to file
+      cmd = TaskLog.captureOutAndErrorTee(null, cmd, stdout, stderr, logLength);
+    }
+
+    /* Check if Parent folders for STDOUT exist */
+    checkParentFile(stdout);
+    LOG.debug("STDOUT: " + stdout.getAbsolutePath());
+    /* Check if Parent folders for STDERR exist */
+    checkParentFile(stderr);
+    LOG.debug("STDERR: " + stderr.getAbsolutePath());
+
+    LOG.debug("DEBUG: cmd: " + cmd);
+    process = runClient(cmd, env); // fork c++ binary
+
+    LOG.debug("DEBUG: waiting for Client at "
+        + serverSocket.getLocalSocketAddress());
+
+    try {
+      if (!streamingEnabled) {
+        LOG.debug("DEBUG: waiting for Client at "
+            + serverSocket.getLocalSocketAddress());
+        serverSocket.setSoTimeout(2000);
+        clientSocket = serverSocket.accept();
+        LOG.debug("DEBUG: Client connected! - start BinaryProtocol!");
+
+        downlink = new BinaryProtocol<K1, V1, K2, V2>(conf,
+            clientSocket.getOutputStream(), clientSocket.getInputStream());
+
+        downlink.start();
+      }
+
+    } catch (SocketException e) {
+      BufferedReader br = new BufferedReader(new InputStreamReader(
+          new FileInputStream(stderr)));
+
+      String inputLine;
+      while ((inputLine = br.readLine()) != null) {
+        LOG.error("PipesApp Error: " + inputLine);
+      }
+      br.close();
+
+      throw new SocketException(
+          "Timout: Client pipes application was not connecting!");
+    }
+  }
+
+  /**
+   * Start the child process to handle the task for us.
+   * 
+   * @param peer the current peer including the task's configuration
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void start(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
+      throws IOException, InterruptedException {
+
+    Map<String, String> env = setupEnvironment(peer.getConfiguration());
+    List<String> cmd = setupCommand(peer.getConfiguration());
+
+    // wrap the command in a stdout/stderr capture
+    TaskAttemptID taskid = peer.getTaskId();
+    File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+    File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+
+    // Get the desired maximum length of task's logs.
+    long logLength = TaskLog.getTaskLogLength(peer.getConfiguration());
+
+    if (!streamingEnabled) {
+      cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength);
+    } else {
+      // use tee in streaming to get the output to file
+      cmd = TaskLog.captureOutAndErrorTee(null, cmd, stdout, stderr, logLength);
+    }
+
+    /* Check if Parent folders for STDOUT exist */
+    checkParentFile(stdout);
+    LOG.debug("STDOUT: " + stdout.getAbsolutePath());
+    /* Check if Parent folders for STDERR exist */
+    checkParentFile(stderr);
+    LOG.debug("STDERR: " + stderr.getAbsolutePath());
+
+    LOG.debug("DEBUG: cmd: " + cmd);
+    process = runClient(cmd, env); // fork c++ binary
+
+    try {
+      if (streamingEnabled) {
+        downlink = new StreamingProtocol(peer, process.getOutputStream(),
+            process.getInputStream());
+      } else {
+        LOG.debug("DEBUG: waiting for Client at "
+            + serverSocket.getLocalSocketAddress());
+        serverSocket.setSoTimeout(2000);
+        clientSocket = serverSocket.accept();
+        LOG.debug("DEBUG: Client connected! - start BinaryProtocol!");
+
+        downlink = new BinaryProtocol<K1, V1, K2, V2>(peer,
+            clientSocket.getOutputStream(), clientSocket.getInputStream());
+      }
+
+      downlink.start();
+
+    } catch (SocketException e) {
+      BufferedReader br = new BufferedReader(new InputStreamReader(
+          new FileInputStream(stderr)));
+
+      String inputLine;
+      while ((inputLine = br.readLine()) != null) {
+        LOG.error("PipesApp Error: " + inputLine);
+      }
+      br.close();
+
+      throw new SocketException(
+          "Timout: Client pipes application was not connecting!");
+    }
+  }
+
+  /**
+   * Get the downward protocol object that can send commands down to the
+   * application.
+   * 
+   * @return the downlink proxy
+   */
+  DownwardProtocol<K1, V1, K2, V2> getDownlink() {
+    return downlink;
+  }
+
+  /**
+   * Wait for the application to finish
+   * 
+   * @return did the application finish correctly?
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  boolean waitForFinish() throws InterruptedException, IOException {
+    downlink.flush();
+    return downlink.waitForFinish();
+  }
+
+  /**
+   * Abort the application and wait for it to finish.
+   * 
+   * @param t the exception that signalled the problem
+   * @throws IOException A wrapper around the exception that was passed in
+   */
+  void abort(Throwable t) throws IOException {
+    LOG.info("Aborting because of " + StringUtils.stringifyException(t));
+    try {
+      downlink.abort();
+      downlink.flush();
+    } catch (IOException e) {
+      // IGNORE cleanup problems
+    }
+    try {
+      downlink.waitForFinish();
+    } catch (Throwable ignored) {
+      process.destroy();
+    }
+    IOException wrapper = new IOException("pipe child exception");
+    wrapper.initCause(t);
+    throw wrapper;
+  }
+
+  /**
+   * Clean up the child process and socket if exist.
+   * 
+   * @throws IOException
+   */
+  public void cleanup() throws IOException {
+    if (serverSocket != null) {
+      serverSocket.close();
+    }
+    try {
+      if (downlink != null) {
+        downlink.close();
+      }
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Run a given command in a subprocess, including threads to copy its stdout
+   * and stderr to our stdout and stderr.
+   * 
+   * @param command the command and its arguments
+   * @param env the environment to run the process in
+   * @return a handle on the process
+   * @throws IOException
+   */
+  static Process runClient(List<String> command, Map<String, String> env)
+      throws IOException {
+    ProcessBuilder builder = new ProcessBuilder(command);
+    if (env != null) {
+      builder.environment().putAll(env);
+    }
+    Process result = builder.start();
+    return result;
+  }
+
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java Tue Apr  9 01:28:04 2013
@@ -32,26 +32,53 @@ import org.apache.hama.bsp.sync.SyncExce
  * runtimes.
  */
 public class PipesBSP<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable, M extends Writable>
-    extends BSP<K1, V1, K2, V2, BytesWritable> {
+    extends BSP<K1, V1, K2, V2, BytesWritable> implements PipesApplicable {
 
   private static final Log LOG = LogFactory.getLog(PipesBSP.class);
-  private Application<K1, V1, K2, V2, BytesWritable> application;
+  private PipesApplication<K1, V1, K2, V2, BytesWritable> application;
 
   @Override
   public void setup(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
       throws IOException, SyncException, InterruptedException {
 
-    this.application = new Application<K1, V1, K2, V2, BytesWritable>(peer);
-    application.getDownlink().runSetup(false, false);
+    this.application.start(peer);
+
+    this.application.getDownlink().runSetup(false, false);
+
+    try {
+      this.application.waitForFinish();
+    } catch (IOException e) {
+      LOG.error(e);
+      throw e;
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
   }
 
   @Override
   public void bsp(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
       throws IOException, SyncException, InterruptedException {
 
-    application.getDownlink().runBsp(false, false);
+    this.application.getDownlink().runBsp(false, false);
+
+    try {
+      this.application.waitForFinish();
+    } catch (IOException e) {
+      LOG.error(e);
+      throw e;
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
   }
 
+  /**
+   * This method is called after the BSP method. It can be used for cleanup
+   * purposes. Cleanup is guranteed to be called after the BSP runs, even in
+   * case of exceptions.
+   * 
+   * @param peer Your BSPPeer instance.
+   * @throws IOException
+   */
   @Override
   public void cleanup(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
       throws IOException {
@@ -59,15 +86,23 @@ public class PipesBSP<K1 extends Writabl
     application.getDownlink().runCleanup(false, false);
 
     try {
-      application.waitForFinish();
+      this.application.waitForFinish();
     } catch (IOException e) {
       LOG.error(e);
       throw e;
     } catch (InterruptedException e) {
       e.printStackTrace();
     } finally {
-      application.cleanup();
+      this.application.cleanup();
     }
   }
 
+  @SuppressWarnings("unchecked")
+  @Override
+  public void setApplication(
+      PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> pipesApp) {
+
+    this.application = (PipesApplication<K1, V1, K2, V2, BytesWritable>) pipesApp;
+  }
+
 }

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java Tue Apr  9 01:28:04 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.pipes;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.Partitioner;
+
+/**
+ * 
+ * PipesPartitioner is a Wrapper for C++ Partitioner Java Partitioner ->
+ * BinaryProtocol -> C++ Partitioner and back
+ * 
+ */
+public class PipesPartitioner<K, V> implements Partitioner<K, V>,
+    PipesApplicable {
+
+  private static final Log LOG = LogFactory.getLog(PipesPartitioner.class
+      .getName());
+  private PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> application = null;
+
+  /**
+   * Partitions a specific key value mapping to a bucket.
+   * 
+   * @param key
+   * @param value
+   * @param numTasks
+   * @return a number between 0 and numTasks (exclusive) that tells which
+   *         partition it belongs to.
+   */
+  @Override
+  public int getPartition(K key, V value, int numTasks) {
+    int returnVal = 0;
+    try {
+      // LOG.info("pipesApp==null: " + ((pipesApp == null) ? "true" : "false"));
+      // LOG.info("pipesApp.getDownlink()==null: "
+      // + ((pipesApp.getDownlink() == null) ? "true" : "false"));
+
+      // LOG.info("Class: "+value.getClass().toString());
+      if ((application != null) && (application.getDownlink() != null))
+        returnVal = application.getDownlink().getPartition(key.toString(),
+            value.toString(), numTasks);
+
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+    return returnVal;
+  }
+
+  @Override
+  public void setApplication(
+      PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> pipesApp) {
+
+    this.application = pipesApp;
+  }
+
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java Tue Apr  9 01:28:04 2013
@@ -52,6 +52,7 @@ import org.apache.hama.bsp.HashPartition
 import org.apache.hama.bsp.InputFormat;
 import org.apache.hama.bsp.OutputFormat;
 import org.apache.hama.bsp.Partitioner;
+import org.apache.hama.pipes.util.DistributedCacheUtil;
 
 import com.google.common.base.Joiner;
 
@@ -277,6 +278,19 @@ public class Submitter implements Tool {
       throw ie;
     }
     DistributedCache.setCacheFiles(fileCache, job.getConfiguration());
+
+    // Add libjars to HDFS
+    String tmpjars = job.getConfiguration().get("tmpjars");
+    LOG.debug("conf.get(tmpjars): " + tmpjars);
+
+    if (tmpjars != null) {
+      String hdfsFileUrls = DistributedCacheUtil.addFilesToHDFS(
+          job.getConfiguration(), job.getConfiguration().get("tmpjars"));
+      job.getConfiguration().set("tmpjars", hdfsFileUrls);
+
+      LOG.info("conf.get(tmpjars): " + job.getConfiguration().get("tmpjars"));
+    }
+
   }
 
   /**
@@ -354,6 +368,8 @@ public class Submitter implements Tool {
       return 1;
     }
 
+    LOG.debug("Hama pipes Submitter started!");
+
     cli.addOption("input", false, "input path for bsp", "path");
     cli.addOption("output", false, "output path from bsp", "path");
 

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java Tue Apr  9 01:28:04 2013
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes.protocol;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.pipes.Submitter;
+
+/**
+ * This protocol is a binary implementation of the Hama Pipes protocol.
+ * 
+ * Adapted from Hadoop Pipes.
+ * 
+ */
+public class BinaryProtocol<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
+    implements DownwardProtocol<K1, V1, K2, V2> {
+
+  protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class
+      .getName());
+  public static final int CURRENT_PROTOCOL_VERSION = 0;
+  /**
+   * The buffer size for the command socket
+   */
+  protected static final int BUFFER_SIZE = 128 * 1024;
+
+  protected final DataOutputStream stream;
+  protected final DataOutputBuffer buffer = new DataOutputBuffer();
+
+  private UplinkReader<K1, V1, K2, V2> uplink;
+
+  public final Object hasTaskLock = new Object();
+  private boolean hasTask = false;
+  public final Object resultLock = new Object();
+  private Integer resultInt = null;
+
+  /* Protected final peer is only needed by the Streaming Protocol */
+  protected final BSPPeer<K1, V1, K2, V2, BytesWritable> peer;
+  private Configuration conf;
+
+  /**
+   * Create a proxy object that will speak the binary protocol on a socket.
+   * Upward messages are passed on the specified handler and downward downward
+   * messages are public methods on this object.
+   * 
+   * @param jobConfig The job's configuration
+   * @param sock The socket to communicate on.
+   * @throws IOException
+   */
+  public BinaryProtocol(Configuration conf, OutputStream out, InputStream in)
+      throws IOException {
+    this.conf = conf;
+    this.peer = null;
+
+    // If we are debugging, save a copy of the downlink commands to a file
+    if (Submitter.getKeepCommandFile(conf)) {
+      out = new TeeOutputStream("downlink.data", out);
+    }
+    stream = new DataOutputStream(new BufferedOutputStream(out, BUFFER_SIZE));
+    uplink = new UplinkReader<K1, V1, K2, V2>(this, conf, in);
+
+    uplink.setName("pipe-uplink-handler");
+    uplink.start();
+  }
+
+  /**
+   * Create a proxy object that will speak the binary protocol on a socket.
+   * Upward messages are passed on the specified handler and downward downward
+   * messages are public methods on this object.
+   * 
+   * @param peer the current peer including the task's configuration
+   * @param out The output stream to communicate on.
+   * @param in The input stream to communicate on.
+   * @throws IOException
+   */
+  public BinaryProtocol(BSPPeer<K1, V1, K2, V2, BytesWritable> peer,
+      OutputStream out, InputStream in) throws IOException {
+    this.peer = peer;
+    this.conf = peer.getConfiguration();
+
+    // If we are debugging, save a copy of the downlink commands to a file
+    if (Submitter.getKeepCommandFile(conf)) {
+      out = new TeeOutputStream("downlink.data", out);
+    }
+    stream = new DataOutputStream(new BufferedOutputStream(out, BUFFER_SIZE));
+    uplink = new UplinkReader<K1, V1, K2, V2>(this, peer, in);
+
+    uplink.setName("pipe-uplink-handler");
+    uplink.start();
+  }
+
+  public boolean isHasTask() {
+    return hasTask;
+  }
+
+  public synchronized void setHasTask(boolean hasTask) {
+    this.hasTask = hasTask;
+  }
+
+  public synchronized void setResult(int result) {
+    this.resultInt = result;
+  }
+
+  public DataOutputStream getStream() {
+    return stream;
+  }
+
+  /**
+   * An output stream that will save a copy of the data into a file.
+   */
+  private static class TeeOutputStream extends FilterOutputStream {
+    private OutputStream file;
+
+    TeeOutputStream(String filename, OutputStream base) throws IOException {
+      super(base);
+      file = new FileOutputStream(filename);
+    }
+
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+      file.write(b, off, len);
+      out.write(b, off, len);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      file.write(b);
+      out.write(b);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      file.flush();
+      out.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+      flush();
+      file.close();
+      out.close();
+    }
+  }
+
+  /* ************************************************************ */
+  /* Implementation of DownwardProtocol<K1, V1, K2, V2> */
+  /* ************************************************************ */
+
+  @Override
+  public void start() throws IOException {
+    LOG.debug("starting downlink");
+    WritableUtils.writeVInt(stream, MessageType.START.code);
+    WritableUtils.writeVInt(stream, CURRENT_PROTOCOL_VERSION);
+    flush();
+    LOG.debug("Sent MessageType.START");
+    setBSPJobConf(conf);
+  }
+
+  @Override
+  public void setBSPJobConf(Configuration conf) throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.SET_BSPJOB_CONF.code);
+    List<Entry<String, String>> list = new ArrayList<Entry<String, String>>();
+    for (Entry<String, String> entry : conf) {
+      list.add(entry);
+    }
+    WritableUtils.writeVInt(stream, list.size());
+    for (Entry<String, String> entry : list) {
+      Text.writeString(stream, entry.getKey());
+      Text.writeString(stream, entry.getValue());
+    }
+    flush();
+    LOG.debug("Sent MessageType.SET_BSPJOB_CONF including " + list.size()
+        + " entries.");
+  }
+
+  @Override
+  public void setInputTypes(String keyType, String valueType)
+      throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.SET_INPUT_TYPES.code);
+    Text.writeString(stream, keyType);
+    Text.writeString(stream, valueType);
+    flush();
+    LOG.debug("Sent MessageType.SET_INPUT_TYPES");
+  }
+
+  @Override
+  public void runSetup(boolean pipedInput, boolean pipedOutput)
+      throws IOException {
+
+    WritableUtils.writeVInt(stream, MessageType.RUN_SETUP.code);
+    WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
+    WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
+    flush();
+    setHasTask(true);
+    LOG.debug("Sent MessageType.RUN_SETUP");
+  }
+
+  @Override
+  public void runBsp(boolean pipedInput, boolean pipedOutput)
+      throws IOException {
+
+    WritableUtils.writeVInt(stream, MessageType.RUN_BSP.code);
+    WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
+    WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
+    flush();
+    setHasTask(true);
+    LOG.debug("Sent MessageType.RUN_BSP");
+  }
+
+  @Override
+  public void runCleanup(boolean pipedInput, boolean pipedOutput)
+      throws IOException {
+
+    WritableUtils.writeVInt(stream, MessageType.RUN_CLEANUP.code);
+    WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
+    WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
+    flush();
+    setHasTask(true);
+    LOG.debug("Sent MessageType.RUN_CLEANUP");
+  }
+
+  @Override
+  public int getPartition(String key, String value, int numTasks)
+      throws IOException {
+
+    WritableUtils.writeVInt(stream, MessageType.PARTITION_REQUEST.code);
+    Text.writeString(stream, key);
+    Text.writeString(stream, value);
+    WritableUtils.writeVInt(stream, numTasks);
+    flush();
+    LOG.debug("Sent MessageType.PARTITION_REQUEST - key: " + key + " value: "
+        + value.substring(0, 10) + "..." + " numTasks: " + numTasks);
+
+    int resultVal = 0;
+
+    synchronized (resultLock) {
+      try {
+        while (resultInt == null)
+          resultLock.wait();
+
+        resultVal = resultInt;
+        resultInt = null;
+
+      } catch (InterruptedException e) {
+        LOG.error(e);
+      }
+    }
+    return resultVal;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.ABORT.code);
+    flush();
+    LOG.debug("Sent MessageType.ABORT");
+  }
+
+  @Override
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  /**
+   * Close the connection and shutdown the handler thread.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public void close() throws IOException, InterruptedException {
+    // runCleanup(pipedInput,pipedOutput);
+    LOG.debug("closing connection");
+    endOfInput();
+
+    uplink.interrupt();
+    uplink.join();
+
+    uplink.closeConnection();
+    stream.close();
+  }
+
+  @Override
+  public boolean waitForFinish() throws IOException, InterruptedException {
+    // LOG.debug("waitForFinish... "+hasTask);
+    synchronized (hasTaskLock) {
+      try {
+        while (hasTask)
+          hasTaskLock.wait();
+
+      } catch (InterruptedException e) {
+        LOG.error(e);
+      }
+    }
+
+    return hasTask;
+  }
+
+  public void endOfInput() throws IOException {
+    WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
+    flush();
+    LOG.debug("Sent close command");
+    LOG.debug("Sent MessageType.CLOSE");
+  }
+
+  /**
+   * Write the given object to the stream. If it is a Text or BytesWritable,
+   * write it directly. Otherwise, write it to a buffer and then write the
+   * length and data to the stream.
+   * 
+   * @param obj the object to write
+   * @throws IOException
+   */
+  protected void writeObject(Writable obj) throws IOException {
+    // For Text and BytesWritable, encode them directly, so that they end up
+    // in C++ as the natural translations.
+    if (obj instanceof Text) {
+      Text t = (Text) obj;
+      int len = t.getLength();
+      WritableUtils.writeVInt(stream, len);
+      stream.write(t.getBytes(), 0, len);
+    } else if (obj instanceof BytesWritable) {
+      BytesWritable b = (BytesWritable) obj;
+      int len = b.getLength();
+      WritableUtils.writeVInt(stream, len);
+      stream.write(b.getBytes(), 0, len);
+    } else {
+      buffer.reset();
+      obj.write(buffer);
+      int length = buffer.getLength();
+      WritableUtils.writeVInt(stream, length);
+      stream.write(buffer.getData(), 0, length);
+    }
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java Tue Apr  9 01:28:04 2013
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The abstract description of the downward (from Java to C++) Pipes protocol.
+ * All of these calls are asynchronous and return before the message has been
+ * processed.
+ * 
+ * Adapted from Hadoop Pipes.
+ * 
+ */
+public interface DownwardProtocol<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable> {
+
+  /**
+   * Start communication
+   * 
+   * @throws IOException
+   */
+  void start() throws IOException;
+
+  /**
+   * Set the BSP Job Configuration
+   * 
+   * @throws IOException
+   */
+  void setBSPJobConf(Configuration conf) throws IOException;
+
+  /**
+   * Set the input types for BSP.
+   * 
+   * @param keyType the name of the key's type
+   * @param valueType the name of the value's type
+   * @throws IOException
+   */
+  void setInputTypes(String keyType, String valueType) throws IOException;
+
+  /**
+   * runSetup
+   * 
+   * @throws IOException
+   */
+  void runSetup(boolean pipedInput, boolean pipedOutput) throws IOException;
+
+  /**
+   * runBsp
+   * 
+   * @throws IOException
+   */
+  void runBsp(boolean pipedInput, boolean pipedOutput) throws IOException;
+
+  /**
+   * runCleanup
+   * 
+   * @throws IOException
+   */
+  void runCleanup(boolean pipedInput, boolean pipedOutput) throws IOException;
+
+  /**
+   * getPartition
+   * 
+   * @throws IOException
+   */
+  int getPartition(String key, String value, int numTasks) throws IOException;
+
+  /**
+   * The task should stop as soon as possible, because something has gone wrong.
+   * 
+   * @throws IOException
+   */
+  void abort() throws IOException;
+
+  /**
+   * Flush the data through any buffers.
+   * 
+   * @throws IOException
+   */
+  void flush() throws IOException;
+
+  /**
+   * Close the connection.
+   * 
+   * @throws IOException, InterruptedException
+   */
+  void close() throws IOException, InterruptedException;
+
+  /**
+   * waitForFinish
+   * 
+   * @throws IOException, InterruptedException
+   */
+  boolean waitForFinish() throws IOException, InterruptedException;
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java Tue Apr  9 01:28:04 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes.protocol;
+
+/**
+ * The integer codes to represent the different messages. These must match the
+ * C++ codes or massive confusion will result.
+ * 
+ */
+public enum MessageType {
+  START(0), SET_BSPJOB_CONF(1), SET_INPUT_TYPES(2), RUN_SETUP(3), RUN_BSP(4),
+  RUN_CLEANUP(5), READ_KEYVALUE(6), WRITE_KEYVALUE(7), GET_MSG(8),
+  GET_MSG_COUNT(9), SEND_MSG(10), SYNC(11), GET_ALL_PEERNAME(12),
+  GET_PEERNAME(13), GET_PEER_INDEX(14), GET_PEER_COUNT(15),
+  GET_SUPERSTEP_COUNT(16),
+  REOPEN_INPUT(17), CLEAR(18), CLOSE(19), ABORT(20), DONE(21), TASK_DONE(22),
+  REGISTER_COUNTER(23), INCREMENT_COUNTER(24), SEQFILE_OPEN(25),
+  SEQFILE_READNEXT(26), SEQFILE_APPEND(27), SEQFILE_CLOSE(28),
+  PARTITION_REQUEST(29), PARTITION_RESPONSE(30), LOG(31);
+
+  final int code;
+
+  MessageType(int code) {
+    this.code = code;
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java Tue Apr  9 01:28:04 2013
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.pipes.protocol;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+
+/**
+ * Streaming protocol that inherits from the binary protocol. Basically it
+ * writes everything as text to the peer, each command is separated by newlines.
+ * To distinguish op-codes (like SET_BSPJOB_CONF) from normal output, we use the
+ * surrounds %OP_CODE%=_possible_value.
+ * 
+ * @param <K1> input key.
+ * @param <V1> input value.
+ * @param <K2> output key.
+ * @param <V2> output value.
+ */
+public class StreamingProtocol<K1 extends Writable, V1 extends Writable>
+    extends BinaryProtocol<K1, V1, Text, Text> {
+
+  private static final Pattern PROTOCOL_STRING_PATTERN = Pattern.compile("=");
+
+  private final CyclicBarrier ackBarrier = new CyclicBarrier(2);
+  private volatile boolean brokenBarrier = false;
+
+  public StreamingProtocol(BSPPeer<K1, V1, Text, Text, BytesWritable> peer,
+      OutputStream out, InputStream in) throws IOException {
+    super(peer, out, in);
+  }
+
+  public class StreamingUplinkReaderThread extends
+      UplinkReader<K1, V1, Text, Text> {
+
+    private BufferedReader reader;
+
+    public StreamingUplinkReaderThread(
+        BSPPeer<K1, V1, Text, Text, BytesWritable> peer, InputStream stream)
+        throws IOException {
+      super(null, peer, stream);
+      reader = new BufferedReader(new InputStreamReader(inStream));
+    }
+
+    @Override
+    public void sendMessage() throws IOException {
+      String peerLine = reader.readLine();
+      String msgLine = reader.readLine();
+      peer.send(peerLine, new BytesWritable(msgLine.getBytes()));
+    }
+
+    @Override
+    public void getMessage() throws IOException {
+      BytesWritable currentMessage = peer.getCurrentMessage();
+      if (currentMessage != null)
+        writeLine(new String(currentMessage.getBytes()));
+      else
+        writeLine("%%-1%%");
+    }
+
+    @Override
+    public void getMessageCount() throws IOException {
+      writeLine("" + peer.getNumCurrentMessages());
+    }
+
+    @Override
+    public void getSuperstepCount() throws IOException {
+      writeLine("" + peer.getSuperstepCount());
+    }
+
+    @Override
+    public void getPeerName() throws IOException {
+      int id = Integer.parseInt(reader.readLine());
+      if (id == -1)
+        writeLine(peer.getPeerName());
+      else
+        writeLine(peer.getPeerName(id));
+    }
+
+    @Override
+    public void getPeerIndex() throws IOException {
+      writeLine("" + peer.getPeerIndex());
+    }
+
+    @Override
+    public void getAllPeerNames() throws IOException {
+      writeLine("" + peer.getAllPeerNames().length);
+      for (String s : peer.getAllPeerNames()) {
+        writeLine(s);
+      }
+    }
+
+    @Override
+    public void getPeerCount() throws IOException {
+      writeLine("" + peer.getAllPeerNames().length);
+    }
+
+    @Override
+    public void sync() throws IOException, SyncException, InterruptedException {
+      peer.sync();
+      writeLine(getProtocolString(MessageType.SYNC) + "_SUCCESS");
+    }
+
+    @Override
+    public void writeKeyValue() throws IOException {
+      String key = reader.readLine();
+      String value = reader.readLine();
+      peer.write(new Text(key), new Text(value));
+    }
+
+    @Override
+    public void readKeyValue() throws IOException {
+      KeyValuePair<K1, V1> readNext = peer.readNext();
+      if (readNext == null) {
+        writeLine("%%-1%%");
+        writeLine("%%-1%%");
+      } else {
+        writeLine(readNext.getKey() + "");
+        writeLine(readNext.getValue() + "");
+      }
+    }
+
+    @Override
+    public void reopenInput() throws IOException {
+      peer.reopenInput();
+    }
+
+    @Override
+    public int readCommand() throws IOException {
+      String readLine = reader.readLine();
+      if (readLine != null && !readLine.isEmpty()) {
+        String[] split = PROTOCOL_STRING_PATTERN.split(readLine, 2);
+        split[0] = split[0].replace("%", "");
+        if (checkAcks(split))
+          return -1;
+        try {
+          int parseInt = Integer.parseInt(split[0]);
+          if (parseInt == MessageType.LOG.code) {
+            LOG.info(split[1]);
+            return -1;
+          }
+          return parseInt;
+        } catch (NumberFormatException e) {
+          e.printStackTrace();
+        }
+      } else {
+        return -1;
+      }
+      return -2;
+    }
+
+    @Override
+    protected void onError(Throwable e) {
+      super.onError(e);
+      // break the barrier if we had an error
+      ackBarrier.reset();
+      brokenBarrier = true;
+    }
+
+    private boolean checkAcks(String[] readLine) {
+      if (readLine[0].startsWith("ACK_")) {
+        try {
+          ackBarrier.await();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        } catch (BrokenBarrierException e) {
+          e.printStackTrace();
+        }
+        return true;
+      }
+      return false;
+    }
+
+  }
+
+  @Override
+  public void start() throws IOException {
+    writeLine(MessageType.START, null);
+    writeLine("" + CURRENT_PROTOCOL_VERSION);
+    setBSPJobConf(peer.getConfiguration());
+    try {
+      ackBarrier.await();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (BrokenBarrierException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void setBSPJobConf(Configuration conf) throws IOException {
+    writeLine(MessageType.SET_BSPJOB_CONF, null);
+    List<String> list = new ArrayList<String>();
+    for (Map.Entry<String, String> itm : conf) {
+      list.add(itm.getKey());
+      list.add(itm.getValue());
+    }
+    writeLine(list.size());
+    for (String entry : list) {
+      writeLine(entry);
+    }
+    flush();
+  }
+
+  @Override
+  public void runSetup(boolean pipedInput, boolean pipedOutput)
+      throws IOException {
+    writeLine(MessageType.RUN_SETUP, null);
+    waitOnAck();
+  }
+
+  @Override
+  public void runBsp(boolean pipedInput, boolean pipedOutput)
+      throws IOException {
+    writeLine(MessageType.RUN_BSP, null);
+    waitOnAck();
+  }
+
+  public void waitOnAck() {
+    try {
+      if (!brokenBarrier)
+        ackBarrier.await();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (BrokenBarrierException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void runCleanup(boolean pipedInput, boolean pipedOutput)
+      throws IOException {
+    writeLine(MessageType.RUN_CLEANUP, null);
+    waitOnAck();
+  }
+
+  /*
+   * @Override public UplinkReaderThread getUplinkReader( BSPPeer<K1, V1, Text,
+   * Text, BytesWritable> peer, InputStream in) throws IOException { return new
+   * StreamingUplinkReaderThread(peer, in); }
+   */
+
+  public void writeLine(int msg) throws IOException {
+    writeLine("" + msg);
+  }
+
+  public void writeLine(String msg) throws IOException {
+    stream.write((msg + "\n").getBytes());
+    stream.flush();
+  }
+
+  public void writeLine(MessageType type, String msg) throws IOException {
+    stream.write((getProtocolString(type) + (msg == null ? "" : msg) + "\n")
+        .getBytes());
+    stream.flush();
+  }
+
+  public String getProtocolString(MessageType type) {
+    return "%" + type.code + "%=";
+  }
+
+}