You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/04/09 03:28:06 UTC
svn commit: r1465852 [14/15] - in /hama/trunk: ./ bin/ c++/ c++/pipes/
c++/pipes/api/ c++/pipes/api/hama/ c++/pipes/debug/ c++/pipes/impl/
c++/utils/ c++/utils/api/ c++/utils/api/hadoop/ c++/utils/impl/
c++/utils/m4/ core/src/main/java/org/apache/hama/...
Added: hama/trunk/c++/utils/m4/hadoop_utils.m4
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/utils/m4/hadoop_utils.m4?rev=1465852&view=auto
==============================================================================
--- hama/trunk/c++/utils/m4/hadoop_utils.m4 (added)
+++ hama/trunk/c++/utils/m4/hadoop_utils.m4 Tue Apr 9 01:28:04 2013
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# hadoop_utils.m4
+
+# Check to see if the install program supports -C
+# If so, use "install -C" for the headers. Otherwise, every install
+# updates the timestamps on the installed headers, which causes a recompilation
+# of any downstream libraries.
+AC_DEFUN([CHECK_INSTALL_CFLAG],[
+AC_REQUIRE([AC_PROG_INSTALL])
+touch foo
+if $INSTALL -C foo bar; then
+ INSTALL_DATA="$INSTALL_DATA -C"
+fi
+rm -f foo bar
+])
+
+# Set up the things we need for compiling hadoop utils
+AC_DEFUN([HADOOP_UTILS_SETUP],[
+AC_REQUIRE([AC_GNU_SOURCE])
+AC_REQUIRE([AC_SYS_LARGEFILE])
+])
+
+# define a macro for using hadoop utils
+AC_DEFUN([USE_HADOOP_UTILS],[
+AC_REQUIRE([HADOOP_UTILS_SETUP])
+AC_ARG_WITH([hadoop-utils],
+ AS_HELP_STRING([--with-hadoop-utils=<dir>],
+ [directory to get hadoop_utils from]),
+ [HADOOP_UTILS_PREFIX="$withval"],
+ [HADOOP_UTILS_PREFIX="\${prefix}"])
+AC_SUBST(HADOOP_UTILS_PREFIX)
+])
+
+AC_DEFUN([HADOOP_PIPES_SETUP],[
+AC_CHECK_HEADERS([pthread.h], [],
+ AC_MSG_ERROR(Please check if you have installed the pthread library))
+AC_CHECK_LIB([pthread], [pthread_create], [],
+ AC_MSG_ERROR(Cannot find libpthread.so, please check))
+AC_CHECK_LIB([ssl], [HMAC_Init], [],
+ AC_MSG_ERROR(Cannot find libssl.so, please check))
+])
+
+# define a macro for using hadoop pipes
+AC_DEFUN([USE_HADOOP_PIPES],[
+AC_REQUIRE([USE_HADOOP_UTILS])
+AC_REQUIRE([HADOOP_PIPES_SETUP])
+AC_ARG_WITH([hadoop-pipes],
+ AS_HELP_STRING([--with-hadoop-pipes=<dir>],
+ [directory to get hadoop pipes from]),
+ [HADOOP_PIPES_PREFIX="$withval"],
+ [HADOOP_PIPES_PREFIX="\${prefix}"])
+AC_SUBST(HADOOP_PIPES_PREFIX)
+])
Added: hama/trunk/c++/utils/missing
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/utils/missing?rev=1465852&view=auto
==============================================================================
--- hama/trunk/c++/utils/missing (added)
+++ hama/trunk/c++/utils/missing Tue Apr 9 01:28:04 2013
@@ -0,0 +1,360 @@
+#! /bin/sh
+# Common stub for a few missing GNU programs while installing.
+
+scriptversion=2005-06-08.21
+
+# Copyright (C) 1996, 1997, 1999, 2000, 2002, 2003, 2004, 2005
+# Free Software Foundation, Inc.
+# Originally by Fran,cois Pinard <pi...@iro.umontreal.ca>, 1996.
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+# As a special exception to the GNU General Public License, if you
+# distribute this file as part of a program that contains a
+# configuration script generated by Autoconf, you may include it under
+# the same distribution terms that you use for the rest of that program.
+
+if test $# -eq 0; then
+ echo 1>&2 "Try \`$0 --help' for more information"
+ exit 1
+fi
+
+run=:
+
+# In the cases where this matters, `missing' is being run in the
+# srcdir already.
+if test -f configure.ac; then
+ configure_ac=configure.ac
+else
+ configure_ac=configure.in
+fi
+
+msg="missing on your system"
+
+case "$1" in
+--run)
+ # Try to run requested program, and just exit if it succeeds.
+ run=
+ shift
+ "$@" && exit 0
+ # Exit code 63 means version mismatch. This often happens
+ # when the user try to use an ancient version of a tool on
+ # a file that requires a minimum version. In this case we
+ # we should proceed has if the program had been absent, or
+ # if --run hadn't been passed.
+ if test $? = 63; then
+ run=:
+ msg="probably too old"
+ fi
+ ;;
+
+ -h|--h|--he|--hel|--help)
+ echo "\
+$0 [OPTION]... PROGRAM [ARGUMENT]...
+
+Handle \`PROGRAM [ARGUMENT]...' for when PROGRAM is missing, or return an
+error status if there is no known handling for PROGRAM.
+
+Options:
+ -h, --help display this help and exit
+ -v, --version output version information and exit
+ --run try to run the given command, and emulate it if it fails
+
+Supported PROGRAM values:
+ aclocal touch file \`aclocal.m4'
+ autoconf touch file \`configure'
+ autoheader touch file \`config.h.in'
+ automake touch all \`Makefile.in' files
+ bison create \`y.tab.[ch]', if possible, from existing .[ch]
+ flex create \`lex.yy.c', if possible, from existing .c
+ help2man touch the output file
+ lex create \`lex.yy.c', if possible, from existing .c
+ makeinfo touch the output file
+ tar try tar, gnutar, gtar, then tar without non-portable flags
+ yacc create \`y.tab.[ch]', if possible, from existing .[ch]
+
+Send bug reports to <bu...@gnu.org>."
+ exit $?
+ ;;
+
+ -v|--v|--ve|--ver|--vers|--versi|--versio|--version)
+ echo "missing $scriptversion (GNU Automake)"
+ exit $?
+ ;;
+
+ -*)
+ echo 1>&2 "$0: Unknown \`$1' option"
+ echo 1>&2 "Try \`$0 --help' for more information"
+ exit 1
+ ;;
+
+esac
+
+# Now exit if we have it, but it failed. Also exit now if we
+# don't have it and --version was passed (most likely to detect
+# the program).
+case "$1" in
+ lex|yacc)
+ # Not GNU programs, they don't have --version.
+ ;;
+
+ tar)
+ if test -n "$run"; then
+ echo 1>&2 "ERROR: \`tar' requires --run"
+ exit 1
+ elif test "x$2" = "x--version" || test "x$2" = "x--help"; then
+ exit 1
+ fi
+ ;;
+
+ *)
+ if test -z "$run" && ($1 --version) > /dev/null 2>&1; then
+ # We have it, but it failed.
+ exit 1
+ elif test "x$2" = "x--version" || test "x$2" = "x--help"; then
+ # Could not run --version or --help. This is probably someone
+ # running `$TOOL --version' or `$TOOL --help' to check whether
+ # $TOOL exists and not knowing $TOOL uses missing.
+ exit 1
+ fi
+ ;;
+esac
+
+# If it does not exist, or fails to run (possibly an outdated version),
+# try to emulate it.
+case "$1" in
+ aclocal*)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified \`acinclude.m4' or \`${configure_ac}'. You might want
+ to install the \`Automake' and \`Perl' packages. Grab them from
+ any GNU archive site."
+ touch aclocal.m4
+ ;;
+
+ autoconf)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified \`${configure_ac}'. You might want to install the
+ \`Autoconf' and \`GNU m4' packages. Grab them from any GNU
+ archive site."
+ touch configure
+ ;;
+
+ autoheader)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified \`acconfig.h' or \`${configure_ac}'. You might want
+ to install the \`Autoconf' and \`GNU m4' packages. Grab them
+ from any GNU archive site."
+ files=`sed -n 's/^[ ]*A[CM]_CONFIG_HEADER(\([^)]*\)).*/\1/p' ${configure_ac}`
+ test -z "$files" && files="config.h"
+ touch_files=
+ for f in $files; do
+ case "$f" in
+ *:*) touch_files="$touch_files "`echo "$f" |
+ sed -e 's/^[^:]*://' -e 's/:.*//'`;;
+ *) touch_files="$touch_files $f.in";;
+ esac
+ done
+ touch $touch_files
+ ;;
+
+ automake*)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified \`Makefile.am', \`acinclude.m4' or \`${configure_ac}'.
+ You might want to install the \`Automake' and \`Perl' packages.
+ Grab them from any GNU archive site."
+ find . -type f -name Makefile.am -print |
+ sed 's/\.am$/.in/' |
+ while read f; do touch "$f"; done
+ ;;
+
+ autom4te)
+ echo 1>&2 "\
+WARNING: \`$1' is needed, but is $msg.
+ You might have modified some files without having the
+ proper tools for further handling them.
+ You can get \`$1' as part of \`Autoconf' from any GNU
+ archive site."
+
+ file=`echo "$*" | sed -n 's/.*--output[ =]*\([^ ]*\).*/\1/p'`
+ test -z "$file" && file=`echo "$*" | sed -n 's/.*-o[ ]*\([^ ]*\).*/\1/p'`
+ if test -f "$file"; then
+ touch $file
+ else
+ test -z "$file" || exec >$file
+ echo "#! /bin/sh"
+ echo "# Created by GNU Automake missing as a replacement of"
+ echo "# $ $@"
+ echo "exit 0"
+ chmod +x $file
+ exit 1
+ fi
+ ;;
+
+ bison|yacc)
+ echo 1>&2 "\
+WARNING: \`$1' $msg. You should only need it if
+ you modified a \`.y' file. You may need the \`Bison' package
+ in order for those modifications to take effect. You can get
+ \`Bison' from any GNU archive site."
+ rm -f y.tab.c y.tab.h
+ if [ $# -ne 1 ]; then
+ eval LASTARG="\${$#}"
+ case "$LASTARG" in
+ *.y)
+ SRCFILE=`echo "$LASTARG" | sed 's/y$/c/'`
+ if [ -f "$SRCFILE" ]; then
+ cp "$SRCFILE" y.tab.c
+ fi
+ SRCFILE=`echo "$LASTARG" | sed 's/y$/h/'`
+ if [ -f "$SRCFILE" ]; then
+ cp "$SRCFILE" y.tab.h
+ fi
+ ;;
+ esac
+ fi
+ if [ ! -f y.tab.h ]; then
+ echo >y.tab.h
+ fi
+ if [ ! -f y.tab.c ]; then
+ echo 'main() { return 0; }' >y.tab.c
+ fi
+ ;;
+
+ lex|flex)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified a \`.l' file. You may need the \`Flex' package
+ in order for those modifications to take effect. You can get
+ \`Flex' from any GNU archive site."
+ rm -f lex.yy.c
+ if [ $# -ne 1 ]; then
+ eval LASTARG="\${$#}"
+ case "$LASTARG" in
+ *.l)
+ SRCFILE=`echo "$LASTARG" | sed 's/l$/c/'`
+ if [ -f "$SRCFILE" ]; then
+ cp "$SRCFILE" lex.yy.c
+ fi
+ ;;
+ esac
+ fi
+ if [ ! -f lex.yy.c ]; then
+ echo 'main() { return 0; }' >lex.yy.c
+ fi
+ ;;
+
+ help2man)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified a dependency of a manual page. You may need the
+ \`Help2man' package in order for those modifications to take
+ effect. You can get \`Help2man' from any GNU archive site."
+
+ file=`echo "$*" | sed -n 's/.*-o \([^ ]*\).*/\1/p'`
+ if test -z "$file"; then
+ file=`echo "$*" | sed -n 's/.*--output=\([^ ]*\).*/\1/p'`
+ fi
+ if [ -f "$file" ]; then
+ touch $file
+ else
+ test -z "$file" || exec >$file
+ echo ".ab help2man is required to generate this page"
+ exit 1
+ fi
+ ;;
+
+ makeinfo)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified a \`.texi' or \`.texinfo' file, or any other file
+ indirectly affecting the aspect of the manual. The spurious
+ call might also be the consequence of using a buggy \`make' (AIX,
+ DU, IRIX). You might want to install the \`Texinfo' package or
+ the \`GNU make' package. Grab either from any GNU archive site."
+ # The file to touch is that specified with -o ...
+ file=`echo "$*" | sed -n 's/.*-o \([^ ]*\).*/\1/p'`
+ if test -z "$file"; then
+ # ... or it is the one specified with @setfilename ...
+ infile=`echo "$*" | sed 's/.* \([^ ]*\) *$/\1/'`
+ file=`sed -n '/^@setfilename/ { s/.* \([^ ]*\) *$/\1/; p; q; }' $infile`
+ # ... or it is derived from the source name (dir/f.texi becomes f.info)
+ test -z "$file" && file=`echo "$infile" | sed 's,.*/,,;s,.[^.]*$,,'`.info
+ fi
+ # If the file does not exist, the user really needs makeinfo;
+ # let's fail without touching anything.
+ test -f $file || exit 1
+ touch $file
+ ;;
+
+ tar)
+ shift
+
+ # We have already tried tar in the generic part.
+ # Look for gnutar/gtar before invocation to avoid ugly error
+ # messages.
+ if (gnutar --version > /dev/null 2>&1); then
+ gnutar "$@" && exit 0
+ fi
+ if (gtar --version > /dev/null 2>&1); then
+ gtar "$@" && exit 0
+ fi
+ firstarg="$1"
+ if shift; then
+ case "$firstarg" in
+ *o*)
+ firstarg=`echo "$firstarg" | sed s/o//`
+ tar "$firstarg" "$@" && exit 0
+ ;;
+ esac
+ case "$firstarg" in
+ *h*)
+ firstarg=`echo "$firstarg" | sed s/h//`
+ tar "$firstarg" "$@" && exit 0
+ ;;
+ esac
+ fi
+
+ echo 1>&2 "\
+WARNING: I can't seem to be able to run \`tar' with the given arguments.
+ You may want to install GNU tar or Free paxutils, or check the
+ command line arguments."
+ exit 1
+ ;;
+
+ *)
+ echo 1>&2 "\
+WARNING: \`$1' is needed, and is $msg.
+ You might have modified some files without having the
+ proper tools for further handling them. Check the \`README' file,
+ it often tells you about the needed prerequisites for installing
+ this package. You may also peek at any GNU archive site, in case
+ some other package would contain this missing \`$1' program."
+ exit 1
+ ;;
+esac
+
+exit 0
+
+# Local variables:
+# eval: (add-hook 'write-file-hooks 'time-stamp)
+# time-stamp-start: "scriptversion="
+# time-stamp-format: "%:y-%02m-%02d.%02H"
+# time-stamp-end: "$"
+# End:
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Tue Apr 9 01:28:04 2013
@@ -23,6 +23,7 @@ import java.net.URLDecoder;
import java.util.Enumeration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -31,6 +32,13 @@ import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
+import org.apache.hama.pipes.PipesApplicable;
+import org.apache.hama.pipes.PipesApplication;
+import org.apache.hama.pipes.PipesPartitioner;
+import org.apache.hama.pipes.util.DistributedCacheUtil;
+
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
/**
* A BSP job configuration.
@@ -47,6 +55,9 @@ public class BSPJob extends BSPJobContex
private BSPJobClient jobClient;
private RunningJob info;
+ private PipesApplication<?, ?, ?, ?, ?> pipesApp = null;
+ private static final Log LOG = LogFactory.getLog(BSPJob.class);
+
public BSPJob() throws IOException {
this(new HamaConfiguration());
}
@@ -241,6 +252,31 @@ public class BSPJob extends BSPJobContex
conf.set(name, value);
}
+ public void setBoolean(String name, boolean value) {
+ conf.setBoolean(name, value);
+ }
+
+ public boolean getBoolean(String name, boolean defaultValue) {
+ return conf.getBoolean(name, defaultValue);
+ }
+
+ public final <K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable> PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> getPipesApplication() {
+ if (pipesApp == null)
+ pipesApp = new PipesApplication<K1, V1, K2, V2, BytesWritable>();
+
+ return pipesApp;
+ }
+
+ public void cleanup() {
+ try {
+ // Close client pipesApplication
+ if (this.getPipesApplication() != null)
+ this.getPipesApplication().cleanup();
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+
public void setNumBspTask(int tasks) {
conf.setInt("bsp.peers.num", tasks);
}
@@ -381,9 +417,32 @@ public class BSPJob extends BSPJobContex
@SuppressWarnings("rawtypes")
public Partitioner getPartitioner() {
- return ReflectionUtils.newInstance(conf.getClass(
+
+ Class<? extends Partitioner> partitionerClass = conf.getClass(
Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
- Partitioner.class), conf);
+ Partitioner.class);
+
+ LOG.info("DEBUG: " + Constants.RUNTIME_PARTITIONING_CLASS + ": "
+ + partitionerClass.toString());
+
+ Partitioner partitioner = ReflectionUtils.newInstance(partitionerClass,
+ conf);
+
+ /* PipesPartitioner usage */
+ if (PipesPartitioner.class.equals(partitionerClass)) {
+ ((PipesApplicable) partitioner)
+ .setApplication(this.getPipesApplication());
+
+ try {
+ DistributedCacheUtil.moveLocalFiles(conf);
+ this.getPipesApplication().start(conf);
+ } catch (IOException e) {
+ LOG.error(e);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ return partitioner;
}
@SuppressWarnings("rawtypes")
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Tue Apr 9 01:28:04 2013
@@ -769,6 +769,9 @@ public class BSPJobClient extends Config
// TODO if error found, kill job
// running.killJob();
jc.close();
+
+ // Added cleanup for Client PipesApp and DistributedCache
+ job.cleanup();
}
/**
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Apr 9 01:28:04 2013
@@ -19,7 +19,8 @@ package org.apache.hama.bsp;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.util.Iterator;
import java.util.Map.Entry;
@@ -28,7 +29,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -47,6 +47,7 @@ import org.apache.hama.bsp.sync.PeerSync
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.pipes.util.DistributedCacheUtil;
import org.apache.hama.util.KeyValuePair;
/**
@@ -242,45 +243,6 @@ public final class BSPPeerImpl<K1, V1, K
}
}
- /**
- * Transfers DistributedCache files into the local cache files. Also creates
- * symbolic links for URIs specified with a fragment if
- * DistributedCache.getSymlinks() is true.
- *
- * @throws IOException If a DistributedCache file cannot be found.
- */
- public final void moveCacheFiles() throws IOException {
- StringBuilder files = new StringBuilder();
- boolean first = true;
- if (DistributedCache.getCacheFiles(conf) != null) {
- for (URI uri : DistributedCache.getCacheFiles(conf)) {
- if (uri != null) {
- if (!first) {
- files.append(",");
- }
- if (null != uri.getFragment() && DistributedCache.getSymlink(conf)) {
-
- FileUtil.symLink(uri.getPath(), uri.getFragment());
- files.append(uri.getFragment()).append(",");
- }
- FileSystem hdfs = FileSystem.get(conf);
- Path pathSrc = new Path(uri.getPath());
- if (hdfs.exists(pathSrc)) {
- LocalFileSystem local = FileSystem.getLocal(conf);
- Path pathDst = new Path(local.getWorkingDirectory(),
- pathSrc.getName());
- hdfs.copyToLocalFile(pathSrc, pathDst);
- files.append(pathDst.toUri().getPath());
- }
- first = false;
- }
- }
- }
- if (files.length() > 0) {
- DistributedCache.addLocalFiles(conf, files.toString());
- }
- }
-
@SuppressWarnings("unchecked")
public final void initInput() throws IOException {
InputSplit inputSplit = null;
@@ -370,12 +332,21 @@ public final class BSPPeerImpl<K1, V1, K
}
};
+ /* Move Files to HDFS */
try {
- moveCacheFiles();
+ DistributedCacheUtil.moveLocalFiles(this.conf);
} catch (Exception e) {
LOG.error(e);
}
+ /* Add additional jars to Classpath */
+ // LOG.info("conf.get(tmpjars): " + this.conf.get("tmpjars"));
+ URL[] libjars = DistributedCacheUtil.addJarsToJobClasspath(this.conf);
+
+ // ATTENTION bspJob.getConf() != this.conf
+ if (libjars != null)
+ bspJob.conf.setClassLoader(new URLClassLoader(libjars, bspJob.conf
+ .getClassLoader()));
}
@Override
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Tue Apr 9 01:28:04 2013
@@ -33,6 +33,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.pipes.PipesApplicable;
+import org.apache.hama.pipes.PipesBSP;
/**
* Base class for tasks.
@@ -156,10 +158,17 @@ public final class BSPTask extends Task
final BytesWritable rawSplit, final BSPPeerProtocol umbilical)
throws Exception {
+ Class<?> workClass = job.getConfiguration().getClass("bsp.work.class",
+ BSP.class);
+
BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M>) ReflectionUtils
- .newInstance(
- job.getConfiguration().getClass("bsp.work.class", BSP.class),
- job.getConfiguration());
+ .newInstance(workClass, job.getConfiguration());
+
+ LOG.debug("bsp.work.class: " + workClass.toString());
+
+ /* Setup PipesApplication if workClass is matching */
+ if (PipesBSP.class.equals(workClass))
+ ((PipesApplicable) bsp).setApplication(job.getPipesApplication());
// The policy is to throw the first exception and log the remaining.
Exception firstException = null;
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueLineRecordReader.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueLineRecordReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueLineRecordReader.java Tue Apr 9 01:28:04 2013
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This class treats a line in the input as a key/value pair separated by a
+ * separator character. The separator can be specified in config file under the
+ * attribute name key.value.separator.in.input.line. The default separator is
+ * the tab character ('\t').
+ */
+public class KeyValueLineRecordReader implements RecordReader<Text, Text> {
+
+ private final LineRecordReader lineRecordReader;
+
+ private byte separator = (byte) '\t';
+
+ private LongWritable dummyKey;
+
+ private Text innerValue;
+
+ @SuppressWarnings("rawtypes")
+ public Class getKeyClass() {
+ return Text.class;
+ }
+
+ public Text createKey() {
+ return new Text();
+ }
+
+ public Text createValue() {
+ return new Text();
+ }
+
+ public KeyValueLineRecordReader(Configuration job, FileSplit split)
+ throws IOException {
+
+ lineRecordReader = new LineRecordReader(job, split);
+ dummyKey = lineRecordReader.createKey();
+ innerValue = lineRecordReader.createValue();
+ String sepStr = job.get("key.value.separator.in.input.line", "\t");
+ this.separator = (byte) sepStr.charAt(0);
+ }
+
+ public static int findSeparator(byte[] utf, int start, int length, byte sep) {
+ for (int i = start; i < (start + length); i++) {
+ if (utf[i] == sep) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ /** Read key/value pair in a line. */
+ public synchronized boolean next(Text key, Text value) throws IOException {
+ Text tKey = key;
+ Text tValue = value;
+ byte[] line = null;
+ int lineLen = -1;
+ if (lineRecordReader.next(dummyKey, innerValue)) {
+ line = innerValue.getBytes();
+ lineLen = innerValue.getLength();
+ } else {
+ return false;
+ }
+ if (line == null)
+ return false;
+ int pos = findSeparator(line, 0, lineLen, this.separator);
+ if (pos == -1) {
+ tKey.set(line, 0, lineLen);
+ tValue.set("");
+ } else {
+ int keyLen = pos;
+ byte[] keyBytes = new byte[keyLen];
+ System.arraycopy(line, 0, keyBytes, 0, keyLen);
+ int valLen = lineLen - keyLen - 1;
+ byte[] valBytes = new byte[valLen];
+ System.arraycopy(line, pos + 1, valBytes, 0, valLen);
+ tKey.set(keyBytes);
+ tValue.set(valBytes);
+ }
+ return true;
+ }
+
+ public float getProgress() {
+ return lineRecordReader.getProgress();
+ }
+
+ public synchronized long getPos() throws IOException {
+ return lineRecordReader.getPos();
+ }
+
+ public synchronized void close() throws IOException {
+ lineRecordReader.close();
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueTextInputFormat.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueTextInputFormat.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/KeyValueTextInputFormat.java Tue Apr 9 01:28:04 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * An {@link InputFormat} for plain text files. Files are broken into lines.
+ * Either linefeed or carriage-return are used to signal end of line. Each line
+ * is divided into key and value parts by a separator byte. If no such a byte
+ * exists, the key will be the entire line and value will be empty.
+ */
+public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
+
+ @Override
+ public RecordReader<Text, Text> getRecordReader(InputSplit genericSplit,
+ BSPJob job) throws IOException {
+ return new KeyValueLineRecordReader(job.conf, (FileSplit) genericSplit);
+ }
+
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Tue Apr 9 01:28:04 2013
@@ -22,7 +22,9 @@ import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -46,10 +48,26 @@ public class TaskLog {
}
}
+ /*
+ * Get LogFile by taskid and distinguish between log and error extension
+ */
public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
// TODO clean up the log path and type.
return new File(LOG_DIR, taskid.getJobID() + "/" + taskid.toString()
- + ".log");
+ + ((filter == LogName.STDERR) ? ".err" : ".log"));
+ }
+
+ /*
+ * Get LogFile by stringPattern and distinguish between log and error
+ * extension
+ */
+ public static File getLocalTaskLogFile(LogName filter, String stringPattern) {
+ // TODO clean up the log path and type.
+ SimpleDateFormat sdf = new SimpleDateFormat();
+ sdf.applyPattern(stringPattern);
+ return new File(LOG_DIR, "job_" + sdf.format(new Date()) + "/" + "local_"
+ + sdf.format(new Date())
+ + ((filter == LogName.STDERR) ? ".err" : ".log"));
}
/**
Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplicable.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplicable.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplicable.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplicable.java Tue Apr 9 01:28:04 2013
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.pipes;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This interface is implemented in Classes which should have access to the
+ * PipesApplication object. e.g., PipesBSP and PipesPartitioner
+ */
+public interface PipesApplicable {
+
+ public void setApplication(
+ PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> pipesApp);
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java Tue Apr 9 01:28:04 2013
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** MODIFIED FOR GPGPU Usage! **/
+
+package org.apache.hama.pipes;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskLog;
+import org.apache.hama.pipes.protocol.BinaryProtocol;
+import org.apache.hama.pipes.protocol.DownwardProtocol;
+import org.apache.hama.pipes.protocol.StreamingProtocol;
+
+/**
+ * This class is responsible for launching and communicating with the child
+ * process.
+ *
+ * Adapted from Hadoop Pipes.
+ *
+ */
+public class PipesApplication<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable, M extends Writable> {
+
+ private static final Log LOG = LogFactory.getLog(PipesApplication.class
+ .getName());
+ private ServerSocket serverSocket;
+ private Process process;
+ private Socket clientSocket;
+
+ private DownwardProtocol<K1, V1, K2, V2> downlink;
+ private boolean streamingEnabled = false;
+
+ static final boolean WINDOWS = System.getProperty("os.name").startsWith(
+ "Windows");
+
+ public PipesApplication() {
+ }
+
+ /* Build Environment based on the Configuration */
+ private Map<String, String> setupEnvironment(Configuration conf)
+ throws IOException {
+
+ Map<String, String> env = new HashMap<String, String>();
+
+ this.streamingEnabled = conf.getBoolean("hama.streaming.enabled", false);
+
+ if (!this.streamingEnabled) {
+ serverSocket = new ServerSocket(0);
+ env.put("hama.pipes.command.port",
+ Integer.toString(serverSocket.getLocalPort()));
+ }
+
+ // add TMPDIR environment variable with the value of java.io.tmpdir
+ env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
+
+ /* Set Logging Environment from Configuration */
+ env.put("hama.pipes.logging",
+ conf.getBoolean("hama.pipes.logging", false) ? "1" : "0");
+ LOG.debug("DEBUG hama.pipes.logging: "
+ + conf.getBoolean("hama.pipes.logging", false));
+
+ return env;
+ }
+
+ /* Build a Command String based on the Configuration */
+ private List<String> setupCommand(Configuration conf) throws IOException,
+ InterruptedException {
+
+ List<String> cmd = new ArrayList<String>();
+ String interpretor = conf.get("hama.pipes.executable.interpretor");
+ if (interpretor != null) {
+ cmd.add(interpretor);
+ }
+
+ String executable = null;
+ try {
+ if (DistributedCache.getLocalCacheFiles(conf) != null) {
+ LOG.debug("DEBUG LocalCacheFilesCount: "
+ + DistributedCache.getLocalCacheFiles(conf).length);
+ for (Path u : DistributedCache.getLocalCacheFiles(conf))
+ LOG.debug("DEBUG LocalCacheFiles: " + u);
+
+ executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
+
+ LOG.debug("DEBUG: executable: " + executable);
+ }
+ } catch (Exception e) {
+ LOG.error("Executable: " + executable + " fs.default.name: "
+ + conf.get("fs.default.name"));
+
+ throw new IOException("Executable is missing!");
+ }
+
+ if (!new File(executable).canExecute()) {
+ // LinuxTaskController sets +x permissions on all distcache files already.
+ // In case of DefaultTaskController, set permissions here.
+ FileUtil.chmod(executable, "u+x");
+ }
+
+ cmd.add(executable);
+
+ String additionalArgs = conf.get("hama.pipes.executable.args");
+ // if true, we are resolving filenames with the linked paths in
+ // DistributedCache
+ boolean resolveArguments = conf.getBoolean(
+ "hama.pipes.resolve.executable.args", false);
+ if (additionalArgs != null && !additionalArgs.isEmpty()) {
+ String[] split = additionalArgs.split(" ");
+ for (String s : split) {
+ if (resolveArguments) {
+ for (Path u : DistributedCache.getLocalCacheFiles(conf)) {
+ if (u.getName().equals(s)) {
+ LOG.info("Resolved argument \"" + s
+ + "\" with fully qualified path \"" + u.toString() + "\"!");
+ cmd.add(u.toString());
+ break;
+ }
+ }
+ } else {
+ cmd.add(s);
+ }
+ }
+ }
+
+ return cmd;
+ }
+
+ /* If Parent File/Folder doesn't exist, build one. */
+ private void checkParentFile(File file) {
+ if (!file.getParentFile().exists()) {
+ file.getParentFile().mkdirs();
+ LOG.debug("File: " + file.getParentFile().getAbsolutePath() + " created!");
+ }
+ }
+
+ /**
+ * Start the child process to handle the task for us. Peer is not available
+ * now, start child by using only the configuration! e.g., PipesPartitioner,
+ * no peer is available at this time!
+ *
+ * @param conf task's configuration
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public void start(Configuration conf) throws IOException,
+ InterruptedException {
+
+ Map<String, String> env = setupEnvironment(conf);
+ List<String> cmd = setupCommand(conf);
+
+ // wrap the command in a stdout/stderr capture
+ File stdout = TaskLog.getLocalTaskLogFile(TaskLog.LogName.STDOUT,
+ "yyyyMMdd'_partitioner_'HHmmss");
+ File stderr = TaskLog.getLocalTaskLogFile(TaskLog.LogName.STDERR,
+ "yyyyMMdd'_partitioner_'HHmmss");
+
+ // Get the desired maximum length of task's logs.
+ long logLength = TaskLog.getTaskLogLength(conf);
+
+ if (!streamingEnabled) {
+ cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength);
+ } else {
+ // use tee in streaming to get the output to file
+ cmd = TaskLog.captureOutAndErrorTee(null, cmd, stdout, stderr, logLength);
+ }
+
+ /* Check if Parent folders for STDOUT exist */
+ checkParentFile(stdout);
+ LOG.debug("STDOUT: " + stdout.getAbsolutePath());
+ /* Check if Parent folders for STDERR exist */
+ checkParentFile(stderr);
+ LOG.debug("STDERR: " + stderr.getAbsolutePath());
+
+ LOG.debug("DEBUG: cmd: " + cmd);
+ process = runClient(cmd, env); // fork c++ binary
+
+ LOG.debug("DEBUG: waiting for Client at "
+ + serverSocket.getLocalSocketAddress());
+
+ try {
+ if (!streamingEnabled) {
+ LOG.debug("DEBUG: waiting for Client at "
+ + serverSocket.getLocalSocketAddress());
+ serverSocket.setSoTimeout(2000);
+ clientSocket = serverSocket.accept();
+ LOG.debug("DEBUG: Client connected! - start BinaryProtocol!");
+
+ downlink = new BinaryProtocol<K1, V1, K2, V2>(conf,
+ clientSocket.getOutputStream(), clientSocket.getInputStream());
+
+ downlink.start();
+ }
+
+ } catch (SocketException e) {
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new FileInputStream(stderr)));
+
+ String inputLine;
+ while ((inputLine = br.readLine()) != null) {
+ LOG.error("PipesApp Error: " + inputLine);
+ }
+ br.close();
+
+ throw new SocketException(
+ "Timout: Client pipes application was not connecting!");
+ }
+ }
+
+ /**
+ * Start the child process to handle the task for us.
+ *
+ * @param peer the current peer including the task's configuration
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void start(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
+ throws IOException, InterruptedException {
+
+ Map<String, String> env = setupEnvironment(peer.getConfiguration());
+ List<String> cmd = setupCommand(peer.getConfiguration());
+
+ // wrap the command in a stdout/stderr capture
+ TaskAttemptID taskid = peer.getTaskId();
+ File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+ File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+
+ // Get the desired maximum length of task's logs.
+ long logLength = TaskLog.getTaskLogLength(peer.getConfiguration());
+
+ if (!streamingEnabled) {
+ cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength);
+ } else {
+ // use tee in streaming to get the output to file
+ cmd = TaskLog.captureOutAndErrorTee(null, cmd, stdout, stderr, logLength);
+ }
+
+ /* Check if Parent folders for STDOUT exist */
+ checkParentFile(stdout);
+ LOG.debug("STDOUT: " + stdout.getAbsolutePath());
+ /* Check if Parent folders for STDERR exist */
+ checkParentFile(stderr);
+ LOG.debug("STDERR: " + stderr.getAbsolutePath());
+
+ LOG.debug("DEBUG: cmd: " + cmd);
+ process = runClient(cmd, env); // fork c++ binary
+
+ try {
+ if (streamingEnabled) {
+ downlink = new StreamingProtocol(peer, process.getOutputStream(),
+ process.getInputStream());
+ } else {
+ LOG.debug("DEBUG: waiting for Client at "
+ + serverSocket.getLocalSocketAddress());
+ serverSocket.setSoTimeout(2000);
+ clientSocket = serverSocket.accept();
+ LOG.debug("DEBUG: Client connected! - start BinaryProtocol!");
+
+ downlink = new BinaryProtocol<K1, V1, K2, V2>(peer,
+ clientSocket.getOutputStream(), clientSocket.getInputStream());
+ }
+
+ downlink.start();
+
+ } catch (SocketException e) {
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new FileInputStream(stderr)));
+
+ String inputLine;
+ while ((inputLine = br.readLine()) != null) {
+ LOG.error("PipesApp Error: " + inputLine);
+ }
+ br.close();
+
+ throw new SocketException(
+ "Timout: Client pipes application was not connecting!");
+ }
+ }
+
+ /**
+ * Get the downward protocol object that can send commands down to the
+ * application.
+ *
+ * @return the downlink proxy
+ */
+ DownwardProtocol<K1, V1, K2, V2> getDownlink() {
+ return downlink;
+ }
+
+ /**
+ * Wait for the application to finish
+ *
+ * @return did the application finish correctly?
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ boolean waitForFinish() throws InterruptedException, IOException {
+ downlink.flush();
+ return downlink.waitForFinish();
+ }
+
+ /**
+ * Abort the application and wait for it to finish.
+ *
+ * @param t the exception that signalled the problem
+ * @throws IOException A wrapper around the exception that was passed in
+ */
+ void abort(Throwable t) throws IOException {
+ LOG.info("Aborting because of " + StringUtils.stringifyException(t));
+ try {
+ downlink.abort();
+ downlink.flush();
+ } catch (IOException e) {
+ // IGNORE cleanup problems
+ }
+ try {
+ downlink.waitForFinish();
+ } catch (Throwable ignored) {
+ process.destroy();
+ }
+ IOException wrapper = new IOException("pipe child exception");
+ wrapper.initCause(t);
+ throw wrapper;
+ }
+
+ /**
+ * Clean up the child process and socket if exist.
+ *
+ * @throws IOException
+ */
+ public void cleanup() throws IOException {
+ if (serverSocket != null) {
+ serverSocket.close();
+ }
+ try {
+ if (downlink != null) {
+ downlink.close();
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Run a given command in a subprocess, including threads to copy its stdout
+ * and stderr to our stdout and stderr.
+ *
+ * @param command the command and its arguments
+ * @param env the environment to run the process in
+ * @return a handle on the process
+ * @throws IOException
+ */
+ static Process runClient(List<String> command, Map<String, String> env)
+ throws IOException {
+ ProcessBuilder builder = new ProcessBuilder(command);
+ if (env != null) {
+ builder.environment().putAll(env);
+ }
+ Process result = builder.start();
+ return result;
+ }
+
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java Tue Apr 9 01:28:04 2013
@@ -32,26 +32,53 @@ import org.apache.hama.bsp.sync.SyncExce
* runtimes.
*/
public class PipesBSP<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable, M extends Writable>
- extends BSP<K1, V1, K2, V2, BytesWritable> {
+ extends BSP<K1, V1, K2, V2, BytesWritable> implements PipesApplicable {
private static final Log LOG = LogFactory.getLog(PipesBSP.class);
- private Application<K1, V1, K2, V2, BytesWritable> application;
+ private PipesApplication<K1, V1, K2, V2, BytesWritable> application;
@Override
public void setup(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
throws IOException, SyncException, InterruptedException {
- this.application = new Application<K1, V1, K2, V2, BytesWritable>(peer);
- application.getDownlink().runSetup(false, false);
+ this.application.start(peer);
+
+ this.application.getDownlink().runSetup(false, false);
+
+ try {
+ this.application.waitForFinish();
+ } catch (IOException e) {
+ LOG.error(e);
+ throw e;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
@Override
public void bsp(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
throws IOException, SyncException, InterruptedException {
- application.getDownlink().runBsp(false, false);
+ this.application.getDownlink().runBsp(false, false);
+
+ try {
+ this.application.waitForFinish();
+ } catch (IOException e) {
+ LOG.error(e);
+ throw e;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
+ /**
+ * This method is called after the BSP method. It can be used for cleanup
+ * purposes. Cleanup is guranteed to be called after the BSP runs, even in
+ * case of exceptions.
+ *
+ * @param peer Your BSPPeer instance.
+ * @throws IOException
+ */
@Override
public void cleanup(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
throws IOException {
@@ -59,15 +86,23 @@ public class PipesBSP<K1 extends Writabl
application.getDownlink().runCleanup(false, false);
try {
- application.waitForFinish();
+ this.application.waitForFinish();
} catch (IOException e) {
LOG.error(e);
throw e;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
- application.cleanup();
+ this.application.cleanup();
}
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setApplication(
+ PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> pipesApp) {
+
+ this.application = (PipesApplication<K1, V1, K2, V2, BytesWritable>) pipesApp;
+ }
+
}
Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java Tue Apr 9 01:28:04 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.pipes;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.Partitioner;
+
+/**
+ *
+ * PipesPartitioner is a Wrapper for C++ Partitioner Java Partitioner ->
+ * BinaryProtocol -> C++ Partitioner and back
+ *
+ */
+public class PipesPartitioner<K, V> implements Partitioner<K, V>,
+ PipesApplicable {
+
+ private static final Log LOG = LogFactory.getLog(PipesPartitioner.class
+ .getName());
+ private PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> application = null;
+
+ /**
+ * Partitions a specific key value mapping to a bucket.
+ *
+ * @param key
+ * @param value
+ * @param numTasks
+ * @return a number between 0 and numTasks (exclusive) that tells which
+ * partition it belongs to.
+ */
+ @Override
+ public int getPartition(K key, V value, int numTasks) {
+ int returnVal = 0;
+ try {
+ // LOG.info("pipesApp==null: " + ((pipesApp == null) ? "true" : "false"));
+ // LOG.info("pipesApp.getDownlink()==null: "
+ // + ((pipesApp.getDownlink() == null) ? "true" : "false"));
+
+ // LOG.info("Class: "+value.getClass().toString());
+ if ((application != null) && (application.getDownlink() != null))
+ returnVal = application.getDownlink().getPartition(key.toString(),
+ value.toString(), numTasks);
+
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ return returnVal;
+ }
+
+ @Override
+ public void setApplication(
+ PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> pipesApp) {
+
+ this.application = pipesApp;
+ }
+
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java Tue Apr 9 01:28:04 2013
@@ -52,6 +52,7 @@ import org.apache.hama.bsp.HashPartition
import org.apache.hama.bsp.InputFormat;
import org.apache.hama.bsp.OutputFormat;
import org.apache.hama.bsp.Partitioner;
+import org.apache.hama.pipes.util.DistributedCacheUtil;
import com.google.common.base.Joiner;
@@ -277,6 +278,19 @@ public class Submitter implements Tool {
throw ie;
}
DistributedCache.setCacheFiles(fileCache, job.getConfiguration());
+
+ // Add libjars to HDFS
+ String tmpjars = job.getConfiguration().get("tmpjars");
+ LOG.debug("conf.get(tmpjars): " + tmpjars);
+
+ if (tmpjars != null) {
+ String hdfsFileUrls = DistributedCacheUtil.addFilesToHDFS(
+ job.getConfiguration(), job.getConfiguration().get("tmpjars"));
+ job.getConfiguration().set("tmpjars", hdfsFileUrls);
+
+ LOG.info("conf.get(tmpjars): " + job.getConfiguration().get("tmpjars"));
+ }
+
}
/**
@@ -354,6 +368,8 @@ public class Submitter implements Tool {
return 1;
}
+ LOG.debug("Hama pipes Submitter started!");
+
cli.addOption("input", false, "input path for bsp", "path");
cli.addOption("output", false, "output path from bsp", "path");
Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java Tue Apr 9 01:28:04 2013
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes.protocol;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.pipes.Submitter;
+
+/**
+ * This protocol is a binary implementation of the Hama Pipes protocol.
+ *
+ * Adapted from Hadoop Pipes.
+ *
+ */
+public class BinaryProtocol<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
+ implements DownwardProtocol<K1, V1, K2, V2> {
+
+ protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class
+ .getName());
+ public static final int CURRENT_PROTOCOL_VERSION = 0;
+ /**
+ * The buffer size for the command socket
+ */
+ protected static final int BUFFER_SIZE = 128 * 1024;
+
+ protected final DataOutputStream stream;
+ protected final DataOutputBuffer buffer = new DataOutputBuffer();
+
+ private UplinkReader<K1, V1, K2, V2> uplink;
+
+ public final Object hasTaskLock = new Object();
+ private boolean hasTask = false;
+ public final Object resultLock = new Object();
+ private Integer resultInt = null;
+
+ /* Protected final peer is only needed by the Streaming Protocol */
+ protected final BSPPeer<K1, V1, K2, V2, BytesWritable> peer;
+ private Configuration conf;
+
+ /**
+ * Create a proxy object that will speak the binary protocol on a socket.
+ * Upward messages are passed on the specified handler and downward downward
+ * messages are public methods on this object.
+ *
+ * @param jobConfig The job's configuration
+ * @param sock The socket to communicate on.
+ * @throws IOException
+ */
+ public BinaryProtocol(Configuration conf, OutputStream out, InputStream in)
+ throws IOException {
+ this.conf = conf;
+ this.peer = null;
+
+ // If we are debugging, save a copy of the downlink commands to a file
+ if (Submitter.getKeepCommandFile(conf)) {
+ out = new TeeOutputStream("downlink.data", out);
+ }
+ stream = new DataOutputStream(new BufferedOutputStream(out, BUFFER_SIZE));
+ uplink = new UplinkReader<K1, V1, K2, V2>(this, conf, in);
+
+ uplink.setName("pipe-uplink-handler");
+ uplink.start();
+ }
+
+ /**
+ * Create a proxy object that will speak the binary protocol on a socket.
+ * Upward messages are passed on the specified handler and downward downward
+ * messages are public methods on this object.
+ *
+ * @param peer the current peer including the task's configuration
+ * @param out The output stream to communicate on.
+ * @param in The input stream to communicate on.
+ * @throws IOException
+ */
+ public BinaryProtocol(BSPPeer<K1, V1, K2, V2, BytesWritable> peer,
+ OutputStream out, InputStream in) throws IOException {
+ this.peer = peer;
+ this.conf = peer.getConfiguration();
+
+ // If we are debugging, save a copy of the downlink commands to a file
+ if (Submitter.getKeepCommandFile(conf)) {
+ out = new TeeOutputStream("downlink.data", out);
+ }
+ stream = new DataOutputStream(new BufferedOutputStream(out, BUFFER_SIZE));
+ uplink = new UplinkReader<K1, V1, K2, V2>(this, peer, in);
+
+ uplink.setName("pipe-uplink-handler");
+ uplink.start();
+ }
+
+ public boolean isHasTask() {
+ return hasTask;
+ }
+
+ public synchronized void setHasTask(boolean hasTask) {
+ this.hasTask = hasTask;
+ }
+
+ public synchronized void setResult(int result) {
+ this.resultInt = result;
+ }
+
+ public DataOutputStream getStream() {
+ return stream;
+ }
+
+ /**
+ * An output stream that will save a copy of the data into a file.
+ */
+ private static class TeeOutputStream extends FilterOutputStream {
+ private OutputStream file;
+
+ TeeOutputStream(String filename, OutputStream base) throws IOException {
+ super(base);
+ file = new FileOutputStream(filename);
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ file.write(b, off, len);
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ file.write(b);
+ out.write(b);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ file.flush();
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ file.close();
+ out.close();
+ }
+ }
+
+ /* ************************************************************ */
+ /* Implementation of DownwardProtocol<K1, V1, K2, V2> */
+ /* ************************************************************ */
+
+ @Override
+ public void start() throws IOException {
+ LOG.debug("starting downlink");
+ WritableUtils.writeVInt(stream, MessageType.START.code);
+ WritableUtils.writeVInt(stream, CURRENT_PROTOCOL_VERSION);
+ flush();
+ LOG.debug("Sent MessageType.START");
+ setBSPJobConf(conf);
+ }
+
+ @Override
+ public void setBSPJobConf(Configuration conf) throws IOException {
+ WritableUtils.writeVInt(stream, MessageType.SET_BSPJOB_CONF.code);
+ List<Entry<String, String>> list = new ArrayList<Entry<String, String>>();
+ for (Entry<String, String> entry : conf) {
+ list.add(entry);
+ }
+ WritableUtils.writeVInt(stream, list.size());
+ for (Entry<String, String> entry : list) {
+ Text.writeString(stream, entry.getKey());
+ Text.writeString(stream, entry.getValue());
+ }
+ flush();
+ LOG.debug("Sent MessageType.SET_BSPJOB_CONF including " + list.size()
+ + " entries.");
+ }
+
+ @Override
+ public void setInputTypes(String keyType, String valueType)
+ throws IOException {
+ WritableUtils.writeVInt(stream, MessageType.SET_INPUT_TYPES.code);
+ Text.writeString(stream, keyType);
+ Text.writeString(stream, valueType);
+ flush();
+ LOG.debug("Sent MessageType.SET_INPUT_TYPES");
+ }
+
+ @Override
+ public void runSetup(boolean pipedInput, boolean pipedOutput)
+ throws IOException {
+
+ WritableUtils.writeVInt(stream, MessageType.RUN_SETUP.code);
+ WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
+ WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
+ flush();
+ setHasTask(true);
+ LOG.debug("Sent MessageType.RUN_SETUP");
+ }
+
+ @Override
+ public void runBsp(boolean pipedInput, boolean pipedOutput)
+ throws IOException {
+
+ WritableUtils.writeVInt(stream, MessageType.RUN_BSP.code);
+ WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
+ WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
+ flush();
+ setHasTask(true);
+ LOG.debug("Sent MessageType.RUN_BSP");
+ }
+
+ @Override
+ public void runCleanup(boolean pipedInput, boolean pipedOutput)
+ throws IOException {
+
+ WritableUtils.writeVInt(stream, MessageType.RUN_CLEANUP.code);
+ WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
+ WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
+ flush();
+ setHasTask(true);
+ LOG.debug("Sent MessageType.RUN_CLEANUP");
+ }
+
+ @Override
+ public int getPartition(String key, String value, int numTasks)
+ throws IOException {
+
+ WritableUtils.writeVInt(stream, MessageType.PARTITION_REQUEST.code);
+ Text.writeString(stream, key);
+ Text.writeString(stream, value);
+ WritableUtils.writeVInt(stream, numTasks);
+ flush();
+ LOG.debug("Sent MessageType.PARTITION_REQUEST - key: " + key + " value: "
+ + value.substring(0, 10) + "..." + " numTasks: " + numTasks);
+
+ int resultVal = 0;
+
+ synchronized (resultLock) {
+ try {
+ while (resultInt == null)
+ resultLock.wait();
+
+ resultVal = resultInt;
+ resultInt = null;
+
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ return resultVal;
+ }
+
+ @Override
+ public void abort() throws IOException {
+ WritableUtils.writeVInt(stream, MessageType.ABORT.code);
+ flush();
+ LOG.debug("Sent MessageType.ABORT");
+ }
+
+ @Override
+ public void flush() throws IOException {
+ stream.flush();
+ }
+
+ /**
+ * Close the connection and shutdown the handler thread.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public void close() throws IOException, InterruptedException {
+ // runCleanup(pipedInput,pipedOutput);
+ LOG.debug("closing connection");
+ endOfInput();
+
+ uplink.interrupt();
+ uplink.join();
+
+ uplink.closeConnection();
+ stream.close();
+ }
+
+ @Override
+ public boolean waitForFinish() throws IOException, InterruptedException {
+ // LOG.debug("waitForFinish... "+hasTask);
+ synchronized (hasTaskLock) {
+ try {
+ while (hasTask)
+ hasTaskLock.wait();
+
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+
+ return hasTask;
+ }
+
+ public void endOfInput() throws IOException {
+ WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
+ flush();
+ LOG.debug("Sent close command");
+ LOG.debug("Sent MessageType.CLOSE");
+ }
+
+ /**
+ * Write the given object to the stream. If it is a Text or BytesWritable,
+ * write it directly. Otherwise, write it to a buffer and then write the
+ * length and data to the stream.
+ *
+ * @param obj the object to write
+ * @throws IOException
+ */
+ protected void writeObject(Writable obj) throws IOException {
+ // For Text and BytesWritable, encode them directly, so that they end up
+ // in C++ as the natural translations.
+ if (obj instanceof Text) {
+ Text t = (Text) obj;
+ int len = t.getLength();
+ WritableUtils.writeVInt(stream, len);
+ stream.write(t.getBytes(), 0, len);
+ } else if (obj instanceof BytesWritable) {
+ BytesWritable b = (BytesWritable) obj;
+ int len = b.getLength();
+ WritableUtils.writeVInt(stream, len);
+ stream.write(b.getBytes(), 0, len);
+ } else {
+ buffer.reset();
+ obj.write(buffer);
+ int length = buffer.getLength();
+ WritableUtils.writeVInt(stream, length);
+ stream.write(buffer.getData(), 0, length);
+ }
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java Tue Apr 9 01:28:04 2013
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The abstract description of the downward (from Java to C++) Pipes protocol.
+ * All of these calls are asynchronous and return before the message has been
+ * processed.
+ *
+ * Adapted from Hadoop Pipes.
+ *
+ */
+public interface DownwardProtocol<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable> {
+
+ /**
+ * Start communication
+ *
+ * @throws IOException
+ */
+ void start() throws IOException;
+
+ /**
+ * Set the BSP Job Configuration
+ *
+ * @throws IOException
+ */
+ void setBSPJobConf(Configuration conf) throws IOException;
+
+ /**
+ * Set the input types for BSP.
+ *
+ * @param keyType the name of the key's type
+ * @param valueType the name of the value's type
+ * @throws IOException
+ */
+ void setInputTypes(String keyType, String valueType) throws IOException;
+
+ /**
+ * runSetup
+ *
+ * @throws IOException
+ */
+ void runSetup(boolean pipedInput, boolean pipedOutput) throws IOException;
+
+ /**
+ * runBsp
+ *
+ * @throws IOException
+ */
+ void runBsp(boolean pipedInput, boolean pipedOutput) throws IOException;
+
+ /**
+ * runCleanup
+ *
+ * @throws IOException
+ */
+ void runCleanup(boolean pipedInput, boolean pipedOutput) throws IOException;
+
+ /**
+ * getPartition
+ *
+ * @throws IOException
+ */
+ int getPartition(String key, String value, int numTasks) throws IOException;
+
+ /**
+ * The task should stop as soon as possible, because something has gone wrong.
+ *
+ * @throws IOException
+ */
+ void abort() throws IOException;
+
+ /**
+ * Flush the data through any buffers.
+ *
+ * @throws IOException
+ */
+ void flush() throws IOException;
+
+ /**
+ * Close the connection.
+ *
+ * @throws IOException, InterruptedException
+ */
+ void close() throws IOException, InterruptedException;
+
+ /**
+ * waitForFinish
+ *
+ * @throws IOException, InterruptedException
+ */
+ boolean waitForFinish() throws IOException, InterruptedException;
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java Tue Apr 9 01:28:04 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes.protocol;
+
+/**
+ * The integer codes to represent the different messages. These must match the
+ * C++ codes or massive confusion will result.
+ *
+ */
+public enum MessageType {
+ START(0), SET_BSPJOB_CONF(1), SET_INPUT_TYPES(2), RUN_SETUP(3), RUN_BSP(4),
+ RUN_CLEANUP(5), READ_KEYVALUE(6), WRITE_KEYVALUE(7), GET_MSG(8),
+ GET_MSG_COUNT(9), SEND_MSG(10), SYNC(11), GET_ALL_PEERNAME(12),
+ GET_PEERNAME(13), GET_PEER_INDEX(14), GET_PEER_COUNT(15),
+ GET_SUPERSTEP_COUNT(16),
+ REOPEN_INPUT(17), CLEAR(18), CLOSE(19), ABORT(20), DONE(21), TASK_DONE(22),
+ REGISTER_COUNTER(23), INCREMENT_COUNTER(24), SEQFILE_OPEN(25),
+ SEQFILE_READNEXT(26), SEQFILE_APPEND(27), SEQFILE_CLOSE(28),
+ PARTITION_REQUEST(29), PARTITION_RESPONSE(30), LOG(31);
+
+ final int code;
+
+ MessageType(int code) {
+ this.code = code;
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java Tue Apr 9 01:28:04 2013
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.pipes.protocol;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+
+/**
+ * Streaming protocol that inherits from the binary protocol. Basically it
+ * writes everything as text to the peer, each command is separated by newlines.
+ * To distinguish op-codes (like SET_BSPJOB_CONF) from normal output, we use the
+ * surrounds %OP_CODE%=_possible_value.
+ *
+ * @param <K1> input key.
+ * @param <V1> input value.
+ * @param <K2> output key.
+ * @param <V2> output value.
+ */
+public class StreamingProtocol<K1 extends Writable, V1 extends Writable>
+ extends BinaryProtocol<K1, V1, Text, Text> {
+
+ private static final Pattern PROTOCOL_STRING_PATTERN = Pattern.compile("=");
+
+ private final CyclicBarrier ackBarrier = new CyclicBarrier(2);
+ private volatile boolean brokenBarrier = false;
+
+ public StreamingProtocol(BSPPeer<K1, V1, Text, Text, BytesWritable> peer,
+ OutputStream out, InputStream in) throws IOException {
+ super(peer, out, in);
+ }
+
+ public class StreamingUplinkReaderThread extends
+ UplinkReader<K1, V1, Text, Text> {
+
+ private BufferedReader reader;
+
+ public StreamingUplinkReaderThread(
+ BSPPeer<K1, V1, Text, Text, BytesWritable> peer, InputStream stream)
+ throws IOException {
+ super(null, peer, stream);
+ reader = new BufferedReader(new InputStreamReader(inStream));
+ }
+
+ @Override
+ public void sendMessage() throws IOException {
+ String peerLine = reader.readLine();
+ String msgLine = reader.readLine();
+ peer.send(peerLine, new BytesWritable(msgLine.getBytes()));
+ }
+
+ @Override
+ public void getMessage() throws IOException {
+ BytesWritable currentMessage = peer.getCurrentMessage();
+ if (currentMessage != null)
+ writeLine(new String(currentMessage.getBytes()));
+ else
+ writeLine("%%-1%%");
+ }
+
+ @Override
+ public void getMessageCount() throws IOException {
+ writeLine("" + peer.getNumCurrentMessages());
+ }
+
+ @Override
+ public void getSuperstepCount() throws IOException {
+ writeLine("" + peer.getSuperstepCount());
+ }
+
+ @Override
+ public void getPeerName() throws IOException {
+ int id = Integer.parseInt(reader.readLine());
+ if (id == -1)
+ writeLine(peer.getPeerName());
+ else
+ writeLine(peer.getPeerName(id));
+ }
+
+ @Override
+ public void getPeerIndex() throws IOException {
+ writeLine("" + peer.getPeerIndex());
+ }
+
+ @Override
+ public void getAllPeerNames() throws IOException {
+ writeLine("" + peer.getAllPeerNames().length);
+ for (String s : peer.getAllPeerNames()) {
+ writeLine(s);
+ }
+ }
+
+ @Override
+ public void getPeerCount() throws IOException {
+ writeLine("" + peer.getAllPeerNames().length);
+ }
+
+ @Override
+ public void sync() throws IOException, SyncException, InterruptedException {
+ peer.sync();
+ writeLine(getProtocolString(MessageType.SYNC) + "_SUCCESS");
+ }
+
+ @Override
+ public void writeKeyValue() throws IOException {
+ String key = reader.readLine();
+ String value = reader.readLine();
+ peer.write(new Text(key), new Text(value));
+ }
+
+ @Override
+ public void readKeyValue() throws IOException {
+ KeyValuePair<K1, V1> readNext = peer.readNext();
+ if (readNext == null) {
+ writeLine("%%-1%%");
+ writeLine("%%-1%%");
+ } else {
+ writeLine(readNext.getKey() + "");
+ writeLine(readNext.getValue() + "");
+ }
+ }
+
+ @Override
+ public void reopenInput() throws IOException {
+ peer.reopenInput();
+ }
+
+ @Override
+ public int readCommand() throws IOException {
+ String readLine = reader.readLine();
+ if (readLine != null && !readLine.isEmpty()) {
+ String[] split = PROTOCOL_STRING_PATTERN.split(readLine, 2);
+ split[0] = split[0].replace("%", "");
+ if (checkAcks(split))
+ return -1;
+ try {
+ int parseInt = Integer.parseInt(split[0]);
+ if (parseInt == MessageType.LOG.code) {
+ LOG.info(split[1]);
+ return -1;
+ }
+ return parseInt;
+ } catch (NumberFormatException e) {
+ e.printStackTrace();
+ }
+ } else {
+ return -1;
+ }
+ return -2;
+ }
+
+ @Override
+ protected void onError(Throwable e) {
+ super.onError(e);
+ // break the barrier if we had an error
+ ackBarrier.reset();
+ brokenBarrier = true;
+ }
+
+ private boolean checkAcks(String[] readLine) {
+ if (readLine[0].startsWith("ACK_")) {
+ try {
+ ackBarrier.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ return true;
+ }
+ return false;
+ }
+
+ }
+
+ @Override
+ public void start() throws IOException {
+ writeLine(MessageType.START, null);
+ writeLine("" + CURRENT_PROTOCOL_VERSION);
+ setBSPJobConf(peer.getConfiguration());
+ try {
+ ackBarrier.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void setBSPJobConf(Configuration conf) throws IOException {
+ writeLine(MessageType.SET_BSPJOB_CONF, null);
+ List<String> list = new ArrayList<String>();
+ for (Map.Entry<String, String> itm : conf) {
+ list.add(itm.getKey());
+ list.add(itm.getValue());
+ }
+ writeLine(list.size());
+ for (String entry : list) {
+ writeLine(entry);
+ }
+ flush();
+ }
+
+ @Override
+ public void runSetup(boolean pipedInput, boolean pipedOutput)
+ throws IOException {
+ writeLine(MessageType.RUN_SETUP, null);
+ waitOnAck();
+ }
+
+ @Override
+ public void runBsp(boolean pipedInput, boolean pipedOutput)
+ throws IOException {
+ writeLine(MessageType.RUN_BSP, null);
+ waitOnAck();
+ }
+
+ public void waitOnAck() {
+ try {
+ if (!brokenBarrier)
+ ackBarrier.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void runCleanup(boolean pipedInput, boolean pipedOutput)
+ throws IOException {
+ writeLine(MessageType.RUN_CLEANUP, null);
+ waitOnAck();
+ }
+
+ /*
+ * @Override public UplinkReaderThread getUplinkReader( BSPPeer<K1, V1, Text,
+ * Text, BytesWritable> peer, InputStream in) throws IOException { return new
+ * StreamingUplinkReaderThread(peer, in); }
+ */
+
+ public void writeLine(int msg) throws IOException {
+ writeLine("" + msg);
+ }
+
+ public void writeLine(String msg) throws IOException {
+ stream.write((msg + "\n").getBytes());
+ stream.flush();
+ }
+
+ public void writeLine(MessageType type, String msg) throws IOException {
+ stream.write((getProtocolString(type) + (msg == null ? "" : msg) + "\n")
+ .getBytes());
+ stream.flush();
+ }
+
+ public String getProtocolString(MessageType type) {
+ return "%" + type.code + "%=";
+ }
+
+}