You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/04/09 03:28:06 UTC
svn commit: r1465852 [6/15] - in /hama/trunk: ./ bin/ c++/ c++/pipes/
c++/pipes/api/ c++/pipes/api/hama/ c++/pipes/debug/ c++/pipes/impl/
c++/utils/ c++/utils/api/ c++/utils/api/hadoop/ c++/utils/impl/
c++/utils/m4/ core/src/main/java/org/apache/hama/b...
Added: hama/trunk/c++/pipes/configure.ac
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/configure.ac?rev=1465852&view=auto
==============================================================================
--- hama/trunk/c++/pipes/configure.ac (added)
+++ hama/trunk/c++/pipes/configure.ac Tue Apr 9 01:28:04 2013
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -*- Autoconf -*-
+# Process this file with autoconf to produce a configure script.
+
+AC_PREREQ(2.59)
+AC_INIT(hama-pipes, 0.1.0, martin.illecker@gmail.com)
+
+AM_INIT_AUTOMAKE([subdir-objects foreign no-dist])
+
+AC_CONFIG_SRCDIR([impl/HamaPipes.cc])
+AC_CONFIG_HEADER([impl/config.h])
+AC_CONFIG_FILES([Makefile])
+
+AC_PREFIX_DEFAULT(`pwd`/../install)
+
+USE_HADOOP_UTILS
+HADOOP_PIPES_SETUP
+CHECK_INSTALL_CFLAG
+
+# Checks for programs.
+AC_PROG_CXX
+AC_PROG_LIBTOOL
+
+# Checks for libraries.
+
+# Checks for header files.
+AC_LANG(C++)
+AC_CHECK_HEADERS([unistd.h])
+
+# Checks for typedefs, structures, and compiler characteristics.
+AC_HEADER_STDBOOL
+AC_C_CONST
+AC_TYPE_OFF_T
+AC_TYPE_SIZE_T
+AC_FUNC_STRERROR_R
+
+# Checks for library functions.
+AC_CHECK_FUNCS([mkdir uname])
+AC_OUTPUT
Added: hama/trunk/c++/pipes/debug/pipes-default-gdb-commands.txt
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/debug/pipes-default-gdb-commands.txt?rev=1465852&view=auto
==============================================================================
--- hama/trunk/c++/pipes/debug/pipes-default-gdb-commands.txt (added)
+++ hama/trunk/c++/pipes/debug/pipes-default-gdb-commands.txt Tue Apr 9 01:28:04 2013
@@ -0,0 +1,3 @@
+info threads
+backtrace
+quit
Added: hama/trunk/c++/pipes/debug/pipes-default-script
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/debug/pipes-default-script?rev=1465852&view=auto
==============================================================================
--- hama/trunk/c++/pipes/debug/pipes-default-script (added)
+++ hama/trunk/c++/pipes/debug/pipes-default-script Tue Apr 9 01:28:04 2013
@@ -0,0 +1,3 @@
+core=`find . -name 'core*'`
+#Only pipes programs have 5th argument as program name.
+gdb -quiet $5 -c $core -x $HADOOP_HOME/src/c++/pipes/debug/pipes-default-gdb-commands.txt
Added: hama/trunk/c++/pipes/depcomp
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/depcomp?rev=1465852&view=auto
==============================================================================
--- hama/trunk/c++/pipes/depcomp (added)
+++ hama/trunk/c++/pipes/depcomp Tue Apr 9 01:28:04 2013
@@ -0,0 +1,530 @@
+#! /bin/sh
+# depcomp - compile a program generating dependencies as side-effects
+
+scriptversion=2005-07-09.11
+
+# Copyright (C) 1999, 2000, 2003, 2004, 2005 Free Software Foundation, Inc.
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+# As a special exception to the GNU General Public License, if you
+# distribute this file as part of a program that contains a
+# configuration script generated by Autoconf, you may include it under
+# the same distribution terms that you use for the rest of that program.
+
+# Originally written by Alexandre Oliva <ol...@dcc.unicamp.br>.
+
+case $1 in
+ '')
+ echo "$0: No command. Try \`$0 --help' for more information." 1>&2
+ exit 1;
+ ;;
+ -h | --h*)
+ cat <<\EOF
+Usage: depcomp [--help] [--version] PROGRAM [ARGS]
+
+Run PROGRAMS ARGS to compile a file, generating dependencies
+as side-effects.
+
+Environment variables:
+ depmode Dependency tracking mode.
+ source Source file read by `PROGRAMS ARGS'.
+ object Object file output by `PROGRAMS ARGS'.
+ DEPDIR directory where to store dependencies.
+ depfile Dependency file to output.
+ tmpdepfile Temporary file to use when outputing dependencies.
+ libtool Whether libtool is used (yes/no).
+
+Report bugs to <bu...@gnu.org>.
+EOF
+ exit $?
+ ;;
+ -v | --v*)
+ echo "depcomp $scriptversion"
+ exit $?
+ ;;
+esac
+
+if test -z "$depmode" || test -z "$source" || test -z "$object"; then
+ echo "depcomp: Variables source, object and depmode must be set" 1>&2
+ exit 1
+fi
+
+# Dependencies for sub/bar.o or sub/bar.obj go into sub/.deps/bar.Po.
+depfile=${depfile-`echo "$object" |
+ sed 's|[^\\/]*$|'${DEPDIR-.deps}'/&|;s|\.\([^.]*\)$|.P\1|;s|Pobj$|Po|'`}
+tmpdepfile=${tmpdepfile-`echo "$depfile" | sed 's/\.\([^.]*\)$/.T\1/'`}
+
+rm -f "$tmpdepfile"
+
+# Some modes work just like other modes, but use different flags. We
+# parameterize here, but still list the modes in the big case below,
+# to make depend.m4 easier to write. Note that we *cannot* use a case
+# here, because this file can only contain one case statement.
+if test "$depmode" = hp; then
+ # HP compiler uses -M and no extra arg.
+ gccflag=-M
+ depmode=gcc
+fi
+
+if test "$depmode" = dashXmstdout; then
+ # This is just like dashmstdout with a different argument.
+ dashmflag=-xM
+ depmode=dashmstdout
+fi
+
+case "$depmode" in
+gcc3)
+## gcc 3 implements dependency tracking that does exactly what
+## we want. Yay! Note: for some reason libtool 1.4 doesn't like
+## it if -MD -MP comes after the -MF stuff. Hmm.
+ "$@" -MT "$object" -MD -MP -MF "$tmpdepfile"
+ stat=$?
+ if test $stat -eq 0; then :
+ else
+ rm -f "$tmpdepfile"
+ exit $stat
+ fi
+ mv "$tmpdepfile" "$depfile"
+ ;;
+
+gcc)
+## There are various ways to get dependency output from gcc. Here's
+## why we pick this rather obscure method:
+## - Don't want to use -MD because we'd like the dependencies to end
+## up in a subdir. Having to rename by hand is ugly.
+## (We might end up doing this anyway to support other compilers.)
+## - The DEPENDENCIES_OUTPUT environment variable makes gcc act like
+## -MM, not -M (despite what the docs say).
+## - Using -M directly means running the compiler twice (even worse
+## than renaming).
+ if test -z "$gccflag"; then
+ gccflag=-MD,
+ fi
+ "$@" -Wp,"$gccflag$tmpdepfile"
+ stat=$?
+ if test $stat -eq 0; then :
+ else
+ rm -f "$tmpdepfile"
+ exit $stat
+ fi
+ rm -f "$depfile"
+ echo "$object : \\" > "$depfile"
+ alpha=ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz
+## The second -e expression handles DOS-style file names with drive letters.
+ sed -e 's/^[^:]*: / /' \
+ -e 's/^['$alpha']:\/[^:]*: / /' < "$tmpdepfile" >> "$depfile"
+## This next piece of magic avoids the `deleted header file' problem.
+## The problem is that when a header file which appears in a .P file
+## is deleted, the dependency causes make to die (because there is
+## typically no way to rebuild the header). We avoid this by adding
+## dummy dependencies for each header file. Too bad gcc doesn't do
+## this for us directly.
+ tr ' ' '
+' < "$tmpdepfile" |
+## Some versions of gcc put a space before the `:'. On the theory
+## that the space means something, we add a space to the output as
+## well.
+## Some versions of the HPUX 10.20 sed can't process this invocation
+## correctly. Breaking it into two sed invocations is a workaround.
+ sed -e 's/^\\$//' -e '/^$/d' -e '/:$/d' | sed -e 's/$/ :/' >> "$depfile"
+ rm -f "$tmpdepfile"
+ ;;
+
+hp)
+ # This case exists only to let depend.m4 do its work. It works by
+ # looking at the text of this script. This case will never be run,
+ # since it is checked for above.
+ exit 1
+ ;;
+
+sgi)
+ if test "$libtool" = yes; then
+ "$@" "-Wp,-MDupdate,$tmpdepfile"
+ else
+ "$@" -MDupdate "$tmpdepfile"
+ fi
+ stat=$?
+ if test $stat -eq 0; then :
+ else
+ rm -f "$tmpdepfile"
+ exit $stat
+ fi
+ rm -f "$depfile"
+
+ if test -f "$tmpdepfile"; then # yes, the sourcefile depend on other files
+ echo "$object : \\" > "$depfile"
+
+ # Clip off the initial element (the dependent). Don't try to be
+ # clever and replace this with sed code, as IRIX sed won't handle
+ # lines with more than a fixed number of characters (4096 in
+ # IRIX 6.2 sed, 8192 in IRIX 6.5). We also remove comment lines;
+ # the IRIX cc adds comments like `#:fec' to the end of the
+ # dependency line.
+ tr ' ' '
+' < "$tmpdepfile" \
+ | sed -e 's/^.*\.o://' -e 's/#.*$//' -e '/^$/ d' | \
+ tr '
+' ' ' >> $depfile
+ echo >> $depfile
+
+ # The second pass generates a dummy entry for each header file.
+ tr ' ' '
+' < "$tmpdepfile" \
+ | sed -e 's/^.*\.o://' -e 's/#.*$//' -e '/^$/ d' -e 's/$/:/' \
+ >> $depfile
+ else
+ # The sourcefile does not contain any dependencies, so just
+ # store a dummy comment line, to avoid errors with the Makefile
+ # "include basename.Plo" scheme.
+ echo "#dummy" > "$depfile"
+ fi
+ rm -f "$tmpdepfile"
+ ;;
+
+aix)
+ # The C for AIX Compiler uses -M and outputs the dependencies
+ # in a .u file. In older versions, this file always lives in the
+ # current directory. Also, the AIX compiler puts `$object:' at the
+ # start of each line; $object doesn't have directory information.
+ # Version 6 uses the directory in both cases.
+ stripped=`echo "$object" | sed 's/\(.*\)\..*$/\1/'`
+ tmpdepfile="$stripped.u"
+ if test "$libtool" = yes; then
+ "$@" -Wc,-M
+ else
+ "$@" -M
+ fi
+ stat=$?
+
+ if test -f "$tmpdepfile"; then :
+ else
+ stripped=`echo "$stripped" | sed 's,^.*/,,'`
+ tmpdepfile="$stripped.u"
+ fi
+
+ if test $stat -eq 0; then :
+ else
+ rm -f "$tmpdepfile"
+ exit $stat
+ fi
+
+ if test -f "$tmpdepfile"; then
+ outname="$stripped.o"
+ # Each line is of the form `foo.o: dependent.h'.
+ # Do two passes, one to just change these to
+ # `$object: dependent.h' and one to simply `dependent.h:'.
+ sed -e "s,^$outname:,$object :," < "$tmpdepfile" > "$depfile"
+ sed -e "s,^$outname: \(.*\)$,\1:," < "$tmpdepfile" >> "$depfile"
+ else
+ # The sourcefile does not contain any dependencies, so just
+ # store a dummy comment line, to avoid errors with the Makefile
+ # "include basename.Plo" scheme.
+ echo "#dummy" > "$depfile"
+ fi
+ rm -f "$tmpdepfile"
+ ;;
+
+icc)
+ # Intel's C compiler understands `-MD -MF file'. However on
+ # icc -MD -MF foo.d -c -o sub/foo.o sub/foo.c
+ # ICC 7.0 will fill foo.d with something like
+ # foo.o: sub/foo.c
+ # foo.o: sub/foo.h
+ # which is wrong. We want:
+ # sub/foo.o: sub/foo.c
+ # sub/foo.o: sub/foo.h
+ # sub/foo.c:
+ # sub/foo.h:
+ # ICC 7.1 will output
+ # foo.o: sub/foo.c sub/foo.h
+ # and will wrap long lines using \ :
+ # foo.o: sub/foo.c ... \
+ # sub/foo.h ... \
+ # ...
+
+ "$@" -MD -MF "$tmpdepfile"
+ stat=$?
+ if test $stat -eq 0; then :
+ else
+ rm -f "$tmpdepfile"
+ exit $stat
+ fi
+ rm -f "$depfile"
+ # Each line is of the form `foo.o: dependent.h',
+ # or `foo.o: dep1.h dep2.h \', or ` dep3.h dep4.h \'.
+ # Do two passes, one to just change these to
+ # `$object: dependent.h' and one to simply `dependent.h:'.
+ sed "s,^[^:]*:,$object :," < "$tmpdepfile" > "$depfile"
+ # Some versions of the HPUX 10.20 sed can't process this invocation
+ # correctly. Breaking it into two sed invocations is a workaround.
+ sed 's,^[^:]*: \(.*\)$,\1,;s/^\\$//;/^$/d;/:$/d' < "$tmpdepfile" |
+ sed -e 's/$/ :/' >> "$depfile"
+ rm -f "$tmpdepfile"
+ ;;
+
+tru64)
+ # The Tru64 compiler uses -MD to generate dependencies as a side
+ # effect. `cc -MD -o foo.o ...' puts the dependencies into `foo.o.d'.
+ # At least on Alpha/Redhat 6.1, Compaq CCC V6.2-504 seems to put
+ # dependencies in `foo.d' instead, so we check for that too.
+ # Subdirectories are respected.
+ dir=`echo "$object" | sed -e 's|/[^/]*$|/|'`
+ test "x$dir" = "x$object" && dir=
+ base=`echo "$object" | sed -e 's|^.*/||' -e 's/\.o$//' -e 's/\.lo$//'`
+
+ if test "$libtool" = yes; then
+ # With Tru64 cc, shared objects can also be used to make a
+ # static library. This mecanism is used in libtool 1.4 series to
+ # handle both shared and static libraries in a single compilation.
+ # With libtool 1.4, dependencies were output in $dir.libs/$base.lo.d.
+ #
+ # With libtool 1.5 this exception was removed, and libtool now
+ # generates 2 separate objects for the 2 libraries. These two
+ # compilations output dependencies in in $dir.libs/$base.o.d and
+ # in $dir$base.o.d. We have to check for both files, because
+ # one of the two compilations can be disabled. We should prefer
+ # $dir$base.o.d over $dir.libs/$base.o.d because the latter is
+ # automatically cleaned when .libs/ is deleted, while ignoring
+ # the former would cause a distcleancheck panic.
+ tmpdepfile1=$dir.libs/$base.lo.d # libtool 1.4
+ tmpdepfile2=$dir$base.o.d # libtool 1.5
+ tmpdepfile3=$dir.libs/$base.o.d # libtool 1.5
+ tmpdepfile4=$dir.libs/$base.d # Compaq CCC V6.2-504
+ "$@" -Wc,-MD
+ else
+ tmpdepfile1=$dir$base.o.d
+ tmpdepfile2=$dir$base.d
+ tmpdepfile3=$dir$base.d
+ tmpdepfile4=$dir$base.d
+ "$@" -MD
+ fi
+
+ stat=$?
+ if test $stat -eq 0; then :
+ else
+ rm -f "$tmpdepfile1" "$tmpdepfile2" "$tmpdepfile3" "$tmpdepfile4"
+ exit $stat
+ fi
+
+ for tmpdepfile in "$tmpdepfile1" "$tmpdepfile2" "$tmpdepfile3" "$tmpdepfile4"
+ do
+ test -f "$tmpdepfile" && break
+ done
+ if test -f "$tmpdepfile"; then
+ sed -e "s,^.*\.[a-z]*:,$object:," < "$tmpdepfile" > "$depfile"
+ # That's a tab and a space in the [].
+ sed -e 's,^.*\.[a-z]*:[ ]*,,' -e 's,$,:,' < "$tmpdepfile" >> "$depfile"
+ else
+ echo "#dummy" > "$depfile"
+ fi
+ rm -f "$tmpdepfile"
+ ;;
+
+#nosideeffect)
+ # This comment above is used by automake to tell side-effect
+ # dependency tracking mechanisms from slower ones.
+
+dashmstdout)
+ # Important note: in order to support this mode, a compiler *must*
+ # always write the preprocessed file to stdout, regardless of -o.
+ "$@" || exit $?
+
+ # Remove the call to Libtool.
+ if test "$libtool" = yes; then
+ while test $1 != '--mode=compile'; do
+ shift
+ done
+ shift
+ fi
+
+ # Remove `-o $object'.
+ IFS=" "
+ for arg
+ do
+ case $arg in
+ -o)
+ shift
+ ;;
+ $object)
+ shift
+ ;;
+ *)
+ set fnord "$@" "$arg"
+ shift # fnord
+ shift # $arg
+ ;;
+ esac
+ done
+
+ test -z "$dashmflag" && dashmflag=-M
+ # Require at least two characters before searching for `:'
+ # in the target name. This is to cope with DOS-style filenames:
+ # a dependency such as `c:/foo/bar' could be seen as target `c' otherwise.
+ "$@" $dashmflag |
+ sed 's:^[ ]*[^: ][^:][^:]*\:[ ]*:'"$object"'\: :' > "$tmpdepfile"
+ rm -f "$depfile"
+ cat < "$tmpdepfile" > "$depfile"
+ tr ' ' '
+' < "$tmpdepfile" | \
+## Some versions of the HPUX 10.20 sed can't process this invocation
+## correctly. Breaking it into two sed invocations is a workaround.
+ sed -e 's/^\\$//' -e '/^$/d' -e '/:$/d' | sed -e 's/$/ :/' >> "$depfile"
+ rm -f "$tmpdepfile"
+ ;;
+
+dashXmstdout)
+ # This case only exists to satisfy depend.m4. It is never actually
+ # run, as this mode is specially recognized in the preamble.
+ exit 1
+ ;;
+
+makedepend)
+ "$@" || exit $?
+ # Remove any Libtool call
+ if test "$libtool" = yes; then
+ while test $1 != '--mode=compile'; do
+ shift
+ done
+ shift
+ fi
+ # X makedepend
+ shift
+ cleared=no
+ for arg in "$@"; do
+ case $cleared in
+ no)
+ set ""; shift
+ cleared=yes ;;
+ esac
+ case "$arg" in
+ -D*|-I*)
+ set fnord "$@" "$arg"; shift ;;
+ # Strip any option that makedepend may not understand. Remove
+ # the object too, otherwise makedepend will parse it as a source file.
+ -*|$object)
+ ;;
+ *)
+ set fnord "$@" "$arg"; shift ;;
+ esac
+ done
+ obj_suffix="`echo $object | sed 's/^.*\././'`"
+ touch "$tmpdepfile"
+ ${MAKEDEPEND-makedepend} -o"$obj_suffix" -f"$tmpdepfile" "$@"
+ rm -f "$depfile"
+ cat < "$tmpdepfile" > "$depfile"
+ sed '1,2d' "$tmpdepfile" | tr ' ' '
+' | \
+## Some versions of the HPUX 10.20 sed can't process this invocation
+## correctly. Breaking it into two sed invocations is a workaround.
+ sed -e 's/^\\$//' -e '/^$/d' -e '/:$/d' | sed -e 's/$/ :/' >> "$depfile"
+ rm -f "$tmpdepfile" "$tmpdepfile".bak
+ ;;
+
+cpp)
+ # Important note: in order to support this mode, a compiler *must*
+ # always write the preprocessed file to stdout.
+ "$@" || exit $?
+
+ # Remove the call to Libtool.
+ if test "$libtool" = yes; then
+ while test $1 != '--mode=compile'; do
+ shift
+ done
+ shift
+ fi
+
+ # Remove `-o $object'.
+ IFS=" "
+ for arg
+ do
+ case $arg in
+ -o)
+ shift
+ ;;
+ $object)
+ shift
+ ;;
+ *)
+ set fnord "$@" "$arg"
+ shift # fnord
+ shift # $arg
+ ;;
+ esac
+ done
+
+ "$@" -E |
+ sed -n -e '/^# [0-9][0-9]* "\([^"]*\)".*/ s:: \1 \\:p' \
+ -e '/^#line [0-9][0-9]* "\([^"]*\)".*/ s:: \1 \\:p' |
+ sed '$ s: \\$::' > "$tmpdepfile"
+ rm -f "$depfile"
+ echo "$object : \\" > "$depfile"
+ cat < "$tmpdepfile" >> "$depfile"
+ sed < "$tmpdepfile" '/^$/d;s/^ //;s/ \\$//;s/$/ :/' >> "$depfile"
+ rm -f "$tmpdepfile"
+ ;;
+
+msvisualcpp)
+ # Important note: in order to support this mode, a compiler *must*
+ # always write the preprocessed file to stdout, regardless of -o,
+ # because we must use -o when running libtool.
+ "$@" || exit $?
+ IFS=" "
+ for arg
+ do
+ case "$arg" in
+ "-Gm"|"/Gm"|"-Gi"|"/Gi"|"-ZI"|"/ZI")
+ set fnord "$@"
+ shift
+ shift
+ ;;
+ *)
+ set fnord "$@" "$arg"
+ shift
+ shift
+ ;;
+ esac
+ done
+ "$@" -E |
+ sed -n '/^#line [0-9][0-9]* "\([^"]*\)"/ s::echo "`cygpath -u \\"\1\\"`":p' | sort | uniq > "$tmpdepfile"
+ rm -f "$depfile"
+ echo "$object : \\" > "$depfile"
+ . "$tmpdepfile" | sed 's% %\\ %g' | sed -n '/^\(.*\)$/ s:: \1 \\:p' >> "$depfile"
+ echo " " >> "$depfile"
+ . "$tmpdepfile" | sed 's% %\\ %g' | sed -n '/^\(.*\)$/ s::\1\::p' >> "$depfile"
+ rm -f "$tmpdepfile"
+ ;;
+
+none)
+ exec "$@"
+ ;;
+
+*)
+ echo "Unknown depmode $depmode" 1>&2
+ exit 1
+ ;;
+esac
+
+exit 0
+
+# Local Variables:
+# mode: shell-script
+# sh-indentation: 2
+# eval: (add-hook 'write-file-hooks 'time-stamp)
+# time-stamp-start: "scriptversion="
+# time-stamp-format: "%:y-%02m-%02d.%02H"
+# time-stamp-end: "$"
+# End:
Added: hama/trunk/c++/pipes/impl/HamaPipes.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/impl/HamaPipes.cc?rev=1465852&view=auto
==============================================================================
--- hama/trunk/c++/pipes/impl/HamaPipes.cc (added)
+++ hama/trunk/c++/pipes/impl/HamaPipes.cc Tue Apr 9 01:28:04 2013
@@ -0,0 +1,1299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hama/Pipes.hh"
+#include "hadoop/SerialUtils.hh"
+#include "hadoop/StringUtils.hh"
+
+#include <map>
+#include <vector>
+
+#include <errno.h>
+#include <netinet/in.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
+#include <sys/socket.h>
+#include <pthread.h>
+#include <iostream>
+#include <fstream>
+
+#include <openssl/hmac.h>
+#include <openssl/buffer.h>
+
+#define stringify( name ) # name
+
+using std::map;
+using std::string;
+using std::vector;
+using std::cout;
+using std::endl;
+
+using namespace HadoopUtils;
+
+namespace HamaPipes {
+
+ bool logging;
+
+ /********************************************/
+ /****************** BSPJob ******************/
+ /********************************************/
+ class BSPJobImpl: public BSPJob {
+ private:
+ map<string, string> values;
+ public:
+ void set(const string& key, const string& value) {
+ values[key] = value;
+ }
+
+ virtual bool hasKey(const string& key) const {
+ return values.find(key) != values.end();
+ }
+
+ virtual const string& get(const string& key) const {
+ map<string,string>::const_iterator itr = values.find(key);
+ if (itr == values.end()) {
+ throw Error("Key " + key + " not found in BSPJob");
+ }
+ return itr->second;
+ }
+
+ virtual int getInt(const string& key) const {
+ const string& val = get(key);
+ return toInt(val);
+ }
+
+ virtual float getFloat(const string& key) const {
+ const string& val = get(key);
+ return toFloat(val);
+ }
+
+ virtual bool getBoolean(const string&key) const {
+ const string& val = get(key);
+ return toBool(val);
+ }
+ };
+
+ /********************************************/
+ /************* DownwardProtocol *************/
+ /********************************************/
+ class DownwardProtocol {
+ public:
+ virtual void start(int protocol) = 0;
+ virtual void setBSPJob(vector<string> values) = 0;
+ virtual void setInputTypes(string keyType, string valueType) = 0;
+ virtual void setKeyValue(const string& _key, const string& _value) = 0;
+
+ virtual void runBsp(bool pipedInput, bool pipedOutput) = 0;
+ virtual void runCleanup(bool pipedInput, bool pipedOutput) = 0;
+ virtual void runSetup(bool pipedInput, bool pipedOutput) = 0;
+ virtual void runPartition(const string& key, const string& value, int32_t numTasks) = 0;
+
+ virtual void setNewResult(int32_t value) = 0;
+ virtual void setNewResult(int64_t value) = 0;
+ virtual void setNewResult(const string& value) = 0;
+ virtual void setNewResult(vector<string> value) = 0;
+
+ //virtual void reduceKey(const string& key) = 0;
+ //virtual void reduceValue(const string& value) = 0;
+ virtual void close() = 0;
+ virtual void abort() = 0;
+ virtual ~DownwardProtocol() {}
+ };
+
+ /********************************************/
+ /************** UpwardProtocol **************/
+ /********************************************/
+ class UpwardProtocol {
+ public:
+ virtual void sendCMD(int32_t cmd) = 0;
+ virtual void sendCMD(int32_t cmd, int32_t value) = 0;
+ virtual void sendCMD(int32_t cmd, int32_t value, const string values[], int size) = 0;
+ virtual void sendCMD(int32_t cmd, const string& value) = 0;
+ virtual void sendCMD(int32_t cmd, const string values[], int size) = 0;
+
+ //virtual void registerCounter(int id, const string& group, const string& name) = 0;
+ //virtual void incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0;
+ virtual void incrementCounter(const string& group, const string& name, uint64_t amount) = 0;
+ virtual ~UpwardProtocol() {}
+ };
+
+ /********************************************/
+ /***************** Protocol *****************/
+ /********************************************/
+ class Protocol {
+ public:
+ virtual void nextEvent() = 0;
+ virtual UpwardProtocol* getUplink() = 0;
+ virtual ~Protocol(){}
+ };
+
+ /********************************************/
+ /*************** MESSAGE_TYPE ***************/
+ /********************************************/
+ enum MESSAGE_TYPE {
+ START_MESSAGE, SET_BSPJOB_CONF, SET_INPUT_TYPES,
+ RUN_SETUP, RUN_BSP, RUN_CLEANUP,
+ READ_KEYVALUE, WRITE_KEYVALUE,
+ GET_MSG, GET_MSG_COUNT,
+ SEND_MSG, SYNC,
+ GET_ALL_PEERNAME, GET_PEERNAME,
+ GET_PEER_INDEX, GET_PEER_COUNT, GET_SUPERSTEP_COUNT,
+ REOPEN_INPUT, CLEAR,
+ CLOSE, ABORT,
+ DONE, TASK_DONE,
+ REGISTER_COUNTER, INCREMENT_COUNTER,
+ SEQFILE_OPEN, SEQFILE_READNEXT,
+ SEQFILE_APPEND, SEQFILE_CLOSE,
+ PARTITION_REQUEST, PARTITION_RESPONSE
+ };
+
+ /* Only needed for debugging output */
+ const char* messageTypeNames[] = {
+ stringify( START_MESSAGE ), stringify( SET_BSPJOB_CONF ), stringify( SET_INPUT_TYPES ),
+ stringify( RUN_SETUP ), stringify( RUN_BSP ), stringify( RUN_CLEANUP ),
+ stringify( READ_KEYVALUE ), stringify( WRITE_KEYVALUE ),
+ stringify( GET_MSG ), stringify( GET_MSG_COUNT ),
+ stringify( SEND_MSG ), stringify( SYNC ),
+ stringify( GET_ALL_PEERNAME ), stringify( GET_PEERNAME ),
+ stringify( GET_PEER_INDEX ), stringify( GET_PEER_COUNT ), stringify( GET_SUPERSTEP_COUNT ),
+ stringify( REOPEN_INPUT ), stringify( CLEAR ),
+ stringify( CLOSE ), stringify( ABORT ),
+ stringify( DONE ), stringify( TASK_DONE ),
+ stringify( REGISTER_COUNTER ), stringify( INCREMENT_COUNTER ),
+ stringify( SEQFILE_OPEN ), stringify( SEQFILE_READNEXT ),
+ stringify( SEQFILE_APPEND ), stringify( SEQFILE_CLOSE ),
+ stringify( PARTITION_REQUEST ), stringify( PARTITION_RESPONSE )
+ };
+
+ /********************************************/
+ /*********** BinaryUpwardProtocol ***********/
+ /********************************************/
+ class BinaryUpwardProtocol: public UpwardProtocol {
+ private:
+ FileOutStream* stream;
+ public:
+ BinaryUpwardProtocol(FILE* _stream) {
+ stream = new FileOutStream();
+ HADOOP_ASSERT(stream->open(_stream), "problem opening stream");
+
+ }
+
+ virtual void sendCMD(int32_t cmd) {
+ serializeInt(cmd, *stream);
+ stream->flush();
+ if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s\n",
+ messageTypeNames[cmd]);
+ }
+
+ virtual void sendCMD(int32_t cmd, int32_t value) {
+ serializeInt(cmd, *stream);
+ serializeInt(value, *stream);
+ stream->flush();
+ if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %d\n",messageTypeNames[cmd],value);
+ }
+
+ virtual void sendCMD(int32_t cmd, const string& value) {
+ serializeInt(cmd, *stream);
+ serializeString(value, *stream);
+ stream->flush();
+ if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %s\n",messageTypeNames[cmd],value.c_str());
+ }
+
+ virtual void sendCMD(int32_t cmd, const string values[], int size) {
+ serializeInt(cmd, *stream);
+ for (int i=0; i<size; i++) {
+ serializeString(values[i], *stream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: %s\n",messageTypeNames[cmd],i+1,values[i].c_str());
+ }
+ stream->flush();
+ }
+
+ virtual void sendCMD(int32_t cmd, int32_t value, const string values[], int size) {
+ serializeInt(cmd, *stream);
+ serializeInt(value, *stream);
+ for (int i=0; i<size; i++) {
+ serializeString(values[i], *stream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: %s\n",messageTypeNames[cmd],i+1,values[i].c_str());
+ }
+ stream->flush();
+ }
+
+ /*
+ virtual void registerCounter(int id, const string& group,
+ const string& name) {
+ serializeInt(REGISTER_COUNTER, *stream);
+ serializeInt(id, *stream);
+ serializeString(group, *stream);
+ serializeString(name, *stream);
+ }
+
+ virtual void incrementCounter(const TaskContext::Counter* counter,
+ uint64_t amount) {
+ serializeInt(INCREMENT_COUNTER, *stream);
+ serializeInt(counter->getId(), *stream);
+ serializeLong(amount, *stream);
+ }
+ */
+
+ virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
+ serializeInt(INCREMENT_COUNTER, *stream);
+ serializeString(group, *stream);
+ serializeString(name, *stream);
+ serializeLong(amount, *stream);
+ stream->flush();
+ if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent incrementCounter\n");
+ }
+
+ ~BinaryUpwardProtocol() {
+ delete stream;
+ }
+ };
+
+ /********************************************/
+ /************** BinaryProtocol **************/
+ /********************************************/
+ class BinaryProtocol: public Protocol {
+ private:
+ FileInStream* downStream;
+ DownwardProtocol* handler;
+ BinaryUpwardProtocol * uplink;
+
+ string key;
+ string value;
+
+ public:
+ BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
+ downStream = new FileInStream();
+ downStream->open(down);
+ uplink = new BinaryUpwardProtocol(up);
+ handler = _handler;
+
+ //authDone = false;
+ //getPassword(password);
+ }
+
+ UpwardProtocol* getUplink() {
+ return uplink;
+ }
+
+
+ virtual void nextEvent() {
+ int32_t cmd;
+ cmd = deserializeInt(*downStream);
+
+ switch (cmd) {
+
+ case START_MESSAGE: {
+ int32_t prot;
+ prot = deserializeInt(*downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE prot: %d\n", prot);
+ handler->start(prot);
+ break;
+ }
+ /* SET BSP Job Configuration / Environment */
+ case SET_BSPJOB_CONF: {
+ int32_t entries;
+ entries = deserializeInt(*downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n", entries);
+ vector<string> result(entries*2);
+ for(int i=0; i < entries*2; ++i) {
+ string item;
+ deserializeString(item, *downStream);
+ result.push_back(item);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n", item.c_str());
+ }
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n", entries);
+ handler->setBSPJob(result);
+ break;
+ }
+ case SET_INPUT_TYPES: {
+ string keyType;
+ string valueType;
+ deserializeString(keyType, *downStream);
+ deserializeString(valueType, *downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n",
+ keyType.c_str(),valueType.c_str());
+ handler->setInputTypes(keyType, valueType);
+ break;
+ }
+ case READ_KEYVALUE: {
+ deserializeString(key, *downStream);
+ deserializeString(value, *downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got READ_KEYVALUE key: %s value: %s\n",
+ key.c_str(),
+ ((value.length()<10)?value.c_str():value.substr(0,9).c_str()) );
+ handler->setKeyValue(key, value);
+ break;
+ }
+ case RUN_SETUP: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n");
+ int32_t pipedInput;
+ int32_t pipedOutput;
+ pipedInput = deserializeInt(*downStream);
+ pipedOutput = deserializeInt(*downStream);
+ handler->runSetup(pipedInput, pipedOutput);
+ break;
+ }
+ case RUN_BSP: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n");
+ int32_t pipedInput;
+ int32_t pipedOutput;
+ pipedInput = deserializeInt(*downStream);
+ pipedOutput = deserializeInt(*downStream);
+ handler->runBsp(pipedInput, pipedOutput);
+ break;
+ }
+ case RUN_CLEANUP: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n");
+ int32_t pipedInput;
+ int32_t pipedOutput;
+ pipedInput = deserializeInt(*downStream);
+ pipedOutput = deserializeInt(*downStream);
+ handler->runCleanup(pipedInput, pipedOutput);
+ break;
+ }
+
+ case PARTITION_REQUEST: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n");
+ string partionKey;
+ string partionValue;
+ int32_t numTasks;
+ deserializeString(partionKey, *downStream);
+ deserializeString(partionValue, *downStream);
+ numTasks = deserializeInt(*downStream);
+ handler->runPartition(partionKey, partionValue, numTasks);
+ break;
+ }
+
+
+ case GET_MSG_COUNT: {
+ int32_t msgCount = deserializeInt(*downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG_COUNT msgCount: %d\n",msgCount);
+ handler->setNewResult(msgCount);
+ break;
+ }
+ case GET_MSG: {
+ string msg;
+ deserializeString(msg,*downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG msg: %s\n",msg.c_str());
+ handler->setNewResult(msg);
+ break;
+ }
+ case GET_PEERNAME: {
+ string peername;
+ deserializeString(peername,*downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEERNAME peername: %s\n",peername.c_str());
+ handler->setNewResult(peername);
+ break;
+ }
+ case GET_ALL_PEERNAME: {
+ vector<string> peernames;
+ int32_t peernameCount = deserializeInt(*downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peernameCount: %d\n",peernameCount);
+ string peername;
+ for (int i=0; i<peernameCount; i++) {
+ deserializeString(peername,*downStream);
+ peernames.push_back(peername);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername: %s\n",peername.c_str());
+ }
+ handler->setNewResult(peernames);
+ break;
+ }
+ case GET_PEER_INDEX: {
+ int32_t peerIndex = deserializeInt(*downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_INDEX peerIndex: %d\n",peerIndex);
+ handler->setNewResult(peerIndex);
+ break;
+ }
+ case GET_PEER_COUNT: {
+ int32_t peerCount = deserializeInt(*downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_COUNT peerCount: %d\n",peerCount);
+ handler->setNewResult(peerCount);
+ break;
+ }
+ case GET_SUPERSTEP_COUNT: {
+ int64_t superstepCount = deserializeLong(*downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_SUPERSTEP_COUNT superstepCount: %ld\n",(long)superstepCount);
+ handler->setNewResult(superstepCount);
+ break;
+ }
+
+
+ case SEQFILE_OPEN: {
+ int32_t fileID = deserializeInt(*downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_OPEN fileID: %d\n",fileID);
+ handler->setNewResult(fileID);
+ break;
+ }
+ case SEQFILE_READNEXT: {
+ deserializeString(key, *downStream);
+ deserializeString(value, *downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_READNEXT key: %s value: %s\n",
+ key.c_str(),
+ ((value.length()<10)?value.c_str():value.substr(0,9).c_str()) );
+ handler->setKeyValue(key, value);
+ break;
+ }
+ case SEQFILE_APPEND: {
+ int32_t result = deserializeInt(*downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_APPEND result: %d\n",result);
+ handler->setNewResult(result);
+ break;
+ }
+ case SEQFILE_CLOSE: {
+ int32_t result = deserializeInt(*downStream);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_CLOSE result: %d\n",result);
+ handler->setNewResult(result);
+ break;
+ }
+
+
+ case CLOSE: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n");
+ handler->close();
+ break;
+ }
+ case ABORT: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n");
+ handler->abort();
+ break;
+ }
+ default:
+ HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",cmd);
+ }
+ }
+
+ virtual ~BinaryProtocol() {
+ delete downStream;
+ delete uplink;
+ }
+ };
+
+ /********************************************/
+ /************** BSPContextImpl **************/
+ /********************************************/
+ class BSPContextImpl: public BSPContext, public DownwardProtocol {
+ private:
+ bool done;
+ BSPJob* job;
+ //string key;
+ //const string* newKey;
+ //const string* value;
+ bool hasTask;
+ //bool isNewKey;
+ //bool isNewValue;
+ string* inputKeyClass;
+ string* inputValueClass;
+
+ //string status;
+ //float progressFloat;
+ //uint64_t lastProgress;
+ //bool statusSet;
+
+ Protocol* protocol;
+ UpwardProtocol *uplink;
+
+ //string* inputSplit;
+
+ RecordReader* reader;
+ RecordWriter* writer;
+
+ BSP* bsp;
+ Partitioner* partitioner;
+
+ const Factory* factory;
+ pthread_mutex_t mutexDone;
+ std::vector<int> registeredCounterIds;
+
+ int32_t resultInt;
+ bool isNewResultInt;
+ int64_t resultLong;
+ bool isNewResultLong;
+ string resultString;
+ bool isNewResultString;
+ vector<string> resultVector;
+ bool isNewResultVector;
+
+ bool isNewKeyValuePair;
+ string currentKey;
+ string currentValue;
+
+ public:
+
+ BSPContextImpl(const Factory& _factory) {
+ //statusSet = false;
+ done = false;
+ //newKey = NULL;
+ factory = &_factory;
+ job = NULL;
+
+ inputKeyClass = NULL;
+ inputValueClass = NULL;
+
+ //inputSplit = NULL;
+
+ bsp = NULL;
+ reader = NULL;
+ writer = NULL;
+ partitioner = NULL;
+ protocol = NULL;
+ //isNewKey = false;
+ //isNewValue = false;
+ //lastProgress = 0;
+ //progressFloat = 0.0f;
+ hasTask = false;
+ pthread_mutex_init(&mutexDone, NULL);
+
+ isNewResultInt = false;
+ isNewResultString = false,
+ isNewResultVector = false;
+
+ isNewKeyValuePair = false;
+ }
+
+
+ /********************************************/
+ /*********** DownwardProtocol IMPL **********/
+ /********************************************/
+ virtual void start(int protocol) {
+ if (protocol != 0) {
+ throw Error("Protocol version " + toString(protocol) +
+ " not supported");
+ }
+ partitioner = factory->createPartitioner(*this);
+ }
+
+ virtual void setBSPJob(vector<string> values) {
+ int len = values.size();
+ BSPJobImpl* result = new BSPJobImpl();
+ HADOOP_ASSERT(len % 2 == 0, "Odd length of job conf values");
+ for(int i=0; i < len; i += 2) {
+ result->set(values[i], values[i+1]);
+ }
+ job = result;
+ }
+
+ virtual void setInputTypes(string keyType, string valueType) {
+ inputKeyClass = new string(keyType);
+ inputValueClass = new string(valueType);
+ }
+
+ virtual void setKeyValue(const string& _key, const string& _value) {
+ currentKey = _key;
+ currentValue = _value;
+ isNewKeyValuePair = true;
+ }
+
+ /* private Method */
+ void setupReaderWriter(bool pipedInput, bool pipedOutput) {
+
+ if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::setupReaderWriter - pipedInput: %s pipedOutput: %s\n",
+ (pipedInput)?"true":"false",(pipedOutput)?"true":"false");
+
+ if (pipedInput && reader==NULL) {
+ reader = factory->createRecordReader(*this);
+ HADOOP_ASSERT((reader == NULL) == pipedInput,
+ pipedInput ? "RecordReader defined when not needed.":
+ "RecordReader not defined");
+
+ //if (reader != NULL) {
+ // value = new string();
+ //}
+ }
+
+ if (pipedOutput && writer==NULL) {
+ writer = factory->createRecordWriter(*this);
+ HADOOP_ASSERT((writer == NULL) == pipedOutput,
+ pipedOutput ? "RecordWriter defined when not needed.":
+ "RecordWriter not defined");
+ }
+ }
+
+ virtual void runSetup(bool pipedInput, bool pipedOutput) {
+ setupReaderWriter(pipedInput,pipedOutput);
+
+ if (bsp == NULL)
+ bsp = factory->createBSP(*this);
+
+ if (bsp != NULL) {
+ hasTask = true;
+ bsp->setup(*this);
+ hasTask = false;
+ uplink->sendCMD(TASK_DONE);
+ }
+ }
+
+ virtual void runBsp(bool pipedInput, bool pipedOutput) {
+ setupReaderWriter(pipedInput,pipedOutput);
+
+ if (bsp == NULL)
+ bsp = factory->createBSP(*this);
+
+ if (bsp != NULL) {
+ hasTask = true;
+ bsp->bsp(*this);
+ hasTask = false;
+ uplink->sendCMD(TASK_DONE);
+ }
+ }
+
+ virtual void runCleanup(bool pipedInput, bool pipedOutput) {
+ setupReaderWriter(pipedInput,pipedOutput);
+
+ if (bsp != NULL) {
+ hasTask = true;
+ bsp->cleanup(*this);
+ hasTask = false;
+ uplink->sendCMD(TASK_DONE);
+ }
+ }
+
+ /********************************************/
+ /******* Partitioner *******/
+ /********************************************/
+ virtual void runPartition(const string& key, const string& value, int32_t numTasks){
+ if (partitioner != NULL) {
+ int part = partitioner->partition(key, value, numTasks);
+ uplink->sendCMD(PARTITION_RESPONSE, part);
+ } else {
+ if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n");
+ }
+ }
+
+ virtual void setNewResult(int32_t _value) {
+ resultInt = _value;
+ isNewResultInt = true;
+ }
+
+ virtual void setNewResult(int64_t _value) {
+ resultLong = _value;
+ isNewResultLong = true;
+ }
+
+ virtual void setNewResult(const string& _value) {
+ resultString = _value;
+ isNewResultString = true;
+ }
+
+ virtual void setNewResult(vector<string> _value) {
+ resultVector = _value;
+ isNewResultVector = true;
+ }
+
+ virtual void close() {
+ pthread_mutex_lock(&mutexDone);
+ done = true;
+ hasTask = false;
+ pthread_mutex_unlock(&mutexDone);
+ if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::close - done: %s hasTask: %s\n",
+ (done)?"true":"false",(hasTask)?"true":"false");
+ }
+
+ virtual void abort() {
+ throw Error("Aborted by driver");
+ }
+
+ /********************************************/
+ /************** TaskContext IMPL ************/
+ /********************************************/
+
+ /**
+ * Get the BSPJob for the current task.
+ */
+ virtual const BSPJob* getBSPJob() {
+ return job;
+ }
+
+ /**
+ * Get the current key.
+ * @return the current key or NULL if called before the first map or reduce
+ */
+ //virtual const string& getInputKey() {
+ // return key;
+ //}
+
+ /**
+ * Get the current value.
+ * @return the current value or NULL if called before the first map or
+ * reduce
+ */
+ //virtual const string& getInputValue() {
+ // return *value;
+ //}
+
+ /**
+ * Register a counter with the given group and name.
+ */
+ /*
+ virtual Counter* getCounter(const std::string& group,
+ const std::string& name) {
+ int id = registeredCounterIds.size();
+ registeredCounterIds.push_back(id);
+ uplink->registerCounter(id, group, name);
+ return new Counter(id);
+ }*/
+
+ /**
+ * Increment the value of the counter with the given amount.
+ */
+ virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
+ uplink->incrementCounter(group, name, amount);
+ }
+
+ /********************************************/
+ /************** BSPContext IMPL *************/
+ /********************************************/
+
+ /**
+ * Access the InputSplit of the bsp.
+ */
+ //virtual const string& getInputSplit() {
+ // return *inputSplit;
+ //}
+
+ /**
+ * Get the name of the key class of the input to this task.
+ */
+ virtual const string& getInputKeyClass() {
+ return *inputKeyClass;
+ }
+
+ /**
+ * Get the name of the value class of the input to this task.
+ */
+ virtual const string& getInputValueClass() {
+ return *inputValueClass;
+ }
+
+ /**
+ * Send a data with a tag to another BSPSlave corresponding to hostname.
+ * Messages sent by this method are not guaranteed to be received in a sent
+ * order.
+ */
+ virtual void sendMessage(const string& peerName, const string& msg) {
+ string values[] = {peerName, msg};
+ uplink->sendCMD(SEND_MSG,values, 2);
+ }
+
+ /**
+ * @return A message from the peer's received messages queue (a FIFO).
+ */
+ virtual const string& getCurrentMessage() {
+ uplink->sendCMD(GET_MSG);
+
+ while (!isNewResultString)
+ protocol->nextEvent();
+
+ isNewResultString = false;
+ if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - NewResultString: %s\n",resultString.c_str());
+ return resultString;
+ }
+
+ /**
+ * @return The number of messages in the peer's received messages queue.
+ */
+ virtual int getNumCurrentMessages() {
+ uplink->sendCMD(GET_MSG_COUNT);
+
+ while (!isNewResultInt)
+ protocol->nextEvent();
+
+ isNewResultInt = false;
+ return resultInt;
+ }
+
+ /**
+ * Barrier Synchronization.
+ *
+ * Sends all the messages in the outgoing message queues to the corresponding
+ * remote peers.
+ */
+ virtual void sync() {
+ uplink->sendCMD(SYNC);
+ }
+
+ /**
+ * @return the name of this peer in the format "hostname:port".
+ */
+ virtual const string& getPeerName() {
+ uplink->sendCMD(GET_PEERNAME,-1);
+
+ while (!isNewResultString)
+ protocol->nextEvent();
+
+ if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",resultString.c_str());
+ isNewResultString = false;
+ return resultString;
+ }
+
+ /**
+ * @return the name of n-th peer from sorted array by name.
+ */
+ virtual const string& getPeerName(int index) {
+ uplink->sendCMD(GET_PEERNAME,index);
+
+ while (!isNewResultString)
+ protocol->nextEvent();
+
+ if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",resultString.c_str());
+ isNewResultString = false;
+ return resultString;
+ }
+
+ /**
+ * @return the names of all the peers executing tasks from the same job
+ * (including this peer).
+ */
+ virtual vector<string> getAllPeerNames() {
+ uplink->sendCMD(GET_ALL_PEERNAME);
+
+ while (!isNewResultVector)
+ protocol->nextEvent();
+
+ isNewResultVector = false;
+ return resultVector;
+ }
+
+ /**
+ * @return the index of this peer from sorted array by name.
+ */
+ virtual int getPeerIndex() {
+ uplink->sendCMD(GET_PEER_INDEX);
+
+ while (!isNewResultInt)
+ protocol->nextEvent();
+
+ isNewResultInt = false;
+ return resultInt;
+ }
+
+ /**
+ * @return the number of peers
+ */
+ virtual int getNumPeers() {
+ uplink->sendCMD(GET_PEER_COUNT);
+
+ while (!isNewResultInt)
+ protocol->nextEvent();
+
+ isNewResultInt = false;
+ return resultInt;
+ }
+
+ /**
+ * @return the count of current super-step
+ */
+ virtual long getSuperstepCount() {
+ uplink->sendCMD(GET_SUPERSTEP_COUNT);
+
+ while (!isNewResultLong)
+ protocol->nextEvent();
+
+ isNewResultLong = false;
+ return resultLong;
+ }
+
+ /**
+ * Clears all queues entries.
+ */
+ virtual void clear() {
+ uplink->sendCMD(CLEAR);
+ }
+
+ /**
+ * Writes a key/value pair to the output collector
+ */
+ virtual void write(const string& key, const string& value) {
+ if (writer != NULL) {
+ writer->emit(key, value);
+ } else {
+ string values[] = {key, value};
+ uplink->sendCMD(WRITE_KEYVALUE, values, 2);
+ }
+ }
+
+ /**
+ * Deserializes the next input key value into the given objects;
+ */
+ virtual bool readNext(string& _key, string& _value) {
+ uplink->sendCMD(READ_KEYVALUE);
+
+ while (!isNewKeyValuePair)
+ protocol->nextEvent();
+
+ isNewKeyValuePair = false;
+
+ _key = currentKey;
+ _value = currentValue;
+
+ if (logging && _key.empty() && _value.empty())
+ fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - Empty KeyValuePair\n");
+
+ return (!_key.empty() && !_value.empty());
+ }
+
+ /**
+ * Closes the input and opens it right away, so that the file pointer is at
+ * the beginning again.
+ */
+ virtual void reopenInput() {
+ uplink->sendCMD(REOPEN_INPUT);
+ }
+
+
+ /********************************************/
+ /******* SequenceFileConnector IMPL *******/
+ /********************************************/
+
+ /**
+ * Open SequenceFile with opion "r" or "w"
+ * @return the corresponding fileID
+ */
+ virtual int sequenceFileOpen(const string& path, const string& option,
+ const string& keyType, const string& valueType) {
+ if (logging)
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen - Path: %s\n",path.c_str());
+
+ if ( (option.compare("r")==0) || (option.compare("w")==0)) {
+
+ string values[] = {path, option, keyType, valueType};
+ uplink->sendCMD(SEQFILE_OPEN,values, 4);
+
+ while (!isNewResultInt)
+ protocol->nextEvent();
+
+ isNewResultInt = false;
+ return resultInt;
+ } else {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen wrong option: %s!\n",option.c_str());
+ return -1; //Error wrong option
+ }
+ }
+
+ /**
+ * Read next key/value pair from the SequenceFile with fileID
+ */
+ virtual bool sequenceFileReadNext(int fileID, string& _key, string& _value) {
+
+ uplink->sendCMD(SEQFILE_READNEXT,fileID);
+
+ while (!isNewKeyValuePair)
+ protocol->nextEvent();
+
+ isNewKeyValuePair = false;
+
+ _key = currentKey;
+ _value = currentValue;
+
+ if (logging && _key.empty() && _value.empty())
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileReadNext - Empty KeyValuePair\n");
+
+ return (!_key.empty() && !_value.empty());
+ }
+
+ /**
+ * Append the next key/value pair to the SequenceFile with fileID
+ */
+ virtual bool sequenceFileAppend(int fileID, const string& key, const string& value) {
+ string values[] = {key, value};
+ uplink->sendCMD(SEQFILE_APPEND,fileID, values, 2);
+
+ while (!isNewResultInt)
+ protocol->nextEvent();
+
+ isNewResultInt = false;
+ return (resultInt==1);
+ }
+
+ /**
+ * Close SequenceFile
+ */
+ virtual bool sequenceFileClose(int fileID) {
+ uplink->sendCMD(SEQFILE_CLOSE,fileID);
+
+ while (!isNewResultInt)
+ protocol->nextEvent();
+
+ if (logging && resultInt==0)
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - Nothing was closed!\n");
+ else if (logging)
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - File was successfully closed!\n");
+
+ isNewResultInt = false;
+ return (resultInt==1);
+ }
+
+ /********************************************/
+ /*************** Other STUFF ***************/
+ /********************************************/
+
+ void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) {
+ protocol = _protocol;
+ uplink = _uplink;
+ }
+
+ bool isDone() {
+ pthread_mutex_lock(&mutexDone);
+ bool doneCopy = done;
+ pthread_mutex_unlock(&mutexDone);
+ return doneCopy;
+ }
+
+ /**
+ * Advance to the next value.
+ */
+ /*
+ bool nextValue() {
+ if (isNewKey || done) {
+ return false;
+ }
+ isNewValue = false;
+ //progress();
+ protocol->nextEvent();
+ return isNewValue;
+ }
+ */
+ void waitForTask() {
+ while (!done && !hasTask) {
+ if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::waitForTask - done: %s hasTask: %s\n",
+ (done)?"true":"false",(hasTask)?"true":"false");
+ protocol->nextEvent();
+ }
+ }
+ /*
+ bool nextKey() {
+ if (reader == NULL) {
+ while (!isNewKey) {
+ nextValue();
+ if (done) {
+ return false;
+ }
+ }
+ key = *newKey;
+ } else {
+ if (!reader->next(key, const_cast<string&>(*value))) {
+ pthread_mutex_lock(&mutexDone);
+ done = true;
+ pthread_mutex_unlock(&mutexDone);
+ return false;
+ }
+ //progressFloat = reader->getProgress();
+ }
+ isNewKey = false;
+
+ if (bsp != NULL) {
+ bsp->bsp(*this);
+ }
+ return true;
+ }
+ */
+ void closeAll() {
+ if (reader) {
+ reader->close();
+ }
+
+ if (bsp) {
+ bsp->close();
+ }
+
+ if (writer) {
+ writer->close();
+ }
+ }
+
+ virtual ~BSPContextImpl() {
+ delete job;
+ delete inputKeyClass;
+ delete inputValueClass;
+ //delete inputSplit;
+ //if (reader) {
+ // delete value;
+ //}
+ delete reader;
+ delete bsp;
+ delete writer;
+ pthread_mutex_destroy(&mutexDone);
+ }
+ };
+
+ /**
+ * Ping the parent every 5 seconds to know if it is alive
+ */
+ void* ping(void* ptr) {
+ BSPContextImpl* context = (BSPContextImpl*) ptr;
+ char* portStr = getenv("hama.pipes.command.port");
+ int MAX_RETRIES = 3;
+ int remaining_retries = MAX_RETRIES;
+ while (!context->isDone()) {
+ try{
+ sleep(5);
+ int sock = -1;
+ if (portStr) {
+ sock = socket(PF_INET, SOCK_STREAM, 0);
+ HADOOP_ASSERT(sock != - 1,
+ string("problem creating socket: ") + strerror(errno));
+ sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(toInt(portStr));
+ addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ if(logging)fprintf(stderr,"HamaPipes::ping - connected to GroomServer Port: %s\n", portStr);
+ HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
+ string("problem connecting command socket: ") +
+ strerror(errno));
+
+ }
+ if (sock != -1) {
+ int result = shutdown(sock, SHUT_RDWR);
+ HADOOP_ASSERT(result == 0, "problem shutting socket");
+ result = close(sock);
+ HADOOP_ASSERT(result == 0, "problem closing socket");
+ }
+ remaining_retries = MAX_RETRIES;
+ } catch (Error& err) {
+ if (!context->isDone()) {
+ fprintf(stderr, "Hama Pipes Exception: in ping %s\n",
+ err.getMessage().c_str());
+ remaining_retries -= 1;
+ if (remaining_retries == 0) {
+ exit(1);
+ }
+ } else {
+ return NULL;
+ }
+ }
+ }
+ return NULL;
+ }
+
+ /**
+ * Run the assigned task in the framework.
+ * The user's main function should set the various functions using the
+ * set* functions above and then call this.
+ * @return true, if the task succeeded.
+ */
+ bool runTask(const Factory& factory) {
+ try {
+ HADOOP_ASSERT(getenv("hama.pipes.logging")!=NULL,"No environment found!");
+
+ logging = (toInt(getenv("hama.pipes.logging"))==0)?false:true;
+ if(logging)fprintf(stderr,"HamaPipes::runTask - logging is: %s\n", (logging)?"true":"false");
+
+ BSPContextImpl* context = new BSPContextImpl(factory);
+ Protocol* connection;
+
+ char* portStr = getenv("hama.pipes.command.port");
+ int sock = -1;
+ FILE* stream = NULL;
+ FILE* outStream = NULL;
+ char *bufin = NULL;
+ char *bufout = NULL;
+ if (portStr) {
+ sock = socket(PF_INET, SOCK_STREAM, 0);
+ HADOOP_ASSERT(sock != - 1,
+ string("problem creating socket: ") + strerror(errno));
+ sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(toInt(portStr));
+ addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
+ string("problem connecting command socket: ") +
+ strerror(errno));
+
+ stream = fdopen(sock, "r");
+ outStream = fdopen(sock, "w");
+
+ // increase buffer size
+ int bufsize = 128*1024;
+ int setbuf;
+ bufin = new char[bufsize];
+ bufout = new char[bufsize];
+ setbuf = setvbuf(stream, bufin, _IOFBF, bufsize);
+ HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ")
+ + strerror(errno));
+ setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
+ HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
+ + strerror(errno));
+
+ connection = new BinaryProtocol(stream, context, outStream);
+ if(logging)fprintf(stderr,"HamaPipes::runTask - connected to GroomServer Port: %s\n", portStr);
+
+ } else if (getenv("hama.pipes.command.file")) {
+ char* filename = getenv("hama.pipes.command.file");
+ string outFilename = filename;
+ outFilename += ".out";
+ stream = fopen(filename, "r");
+ outStream = fopen(outFilename.c_str(), "w");
+ connection = new BinaryProtocol(stream, context, outStream);
+ } else {
+ //connection = new TextProtocol(stdin, context, stdout);
+ fprintf(stderr,"HamaPipes::runTask - Connection couldn't be initialized!\n");
+ return -1;
+ }
+
+ context->setProtocol(connection, connection->getUplink());
+
+ //pthread_t pingThread;
+ //pthread_create(&pingThread, NULL, ping, (void*)(context));
+
+ context->waitForTask();
+
+ //while (!context->isDone()) {
+ //context->nextKey();
+ //}
+
+ context->closeAll();
+ connection->getUplink()->sendCMD(DONE);
+
+ //pthread_join(pingThread,NULL);
+
+ delete context;
+ delete connection;
+ if (stream != NULL) {
+ fflush(stream);
+ }
+ if (outStream != NULL) {
+ fflush(outStream);
+ }
+ fflush(stdout);
+ if (sock != -1) {
+ int result = shutdown(sock, SHUT_RDWR);
+ HADOOP_ASSERT(result == 0, "problem shutting socket");
+ result = close(sock);
+ HADOOP_ASSERT(result == 0, "problem closing socket");
+ }
+ if (stream != NULL) {
+ //fclose(stream);
+ }
+ if (outStream != NULL) {
+ //fclose(outStream);
+ }
+ delete bufin;
+ delete bufout;
+ return true;
+ } catch (Error& err) {
+ fprintf(stderr, "Hama Pipes Exception: %s\n",
+ err.getMessage().c_str());
+ return false;
+ }
+ }
+}
+
Added: hama/trunk/c++/pipes/impl/config.h.in
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/impl/config.h.in?rev=1465852&view=auto
==============================================================================
--- hama/trunk/c++/pipes/impl/config.h.in (added)
+++ hama/trunk/c++/pipes/impl/config.h.in Tue Apr 9 01:28:04 2013
@@ -0,0 +1,141 @@
+/* impl/config.h.in. Generated from configure.ac by autoheader. */
+
+/* Define to 1 if you have the declaration of `strerror_r', and to 0 if you
+ don't. */
+#undef HAVE_DECL_STRERROR_R
+
+/* Define to 1 if you have the <dlfcn.h> header file. */
+#undef HAVE_DLFCN_H
+
+/* Define to 1 if you have the <inttypes.h> header file. */
+#undef HAVE_INTTYPES_H
+
+/* Define to 1 if you have the `pthread' library (-lpthread). */
+#undef HAVE_LIBPTHREAD
+
+/* Define to 1 if you have the `ssl' library (-lssl). */
+#undef HAVE_LIBSSL
+
+/* Define to 1 if you have the <memory.h> header file. */
+#undef HAVE_MEMORY_H
+
+/* Define to 1 if you have the `mkdir' function. */
+#undef HAVE_MKDIR
+
+/* Define to 1 if you have the <pthread.h> header file. */
+#undef HAVE_PTHREAD_H
+
+/* Define to 1 if stdbool.h conforms to C99. */
+#undef HAVE_STDBOOL_H
+
+/* Define to 1 if you have the <stdint.h> header file. */
+#undef HAVE_STDINT_H
+
+/* Define to 1 if you have the <stdlib.h> header file. */
+#undef HAVE_STDLIB_H
+
+/* Define to 1 if you have the `strerror_r' function. */
+#undef HAVE_STRERROR_R
+
+/* Define to 1 if you have the <strings.h> header file. */
+#undef HAVE_STRINGS_H
+
+/* Define to 1 if you have the <string.h> header file. */
+#undef HAVE_STRING_H
+
+/* Define to 1 if you have the <sys/stat.h> header file. */
+#undef HAVE_SYS_STAT_H
+
+/* Define to 1 if you have the <sys/types.h> header file. */
+#undef HAVE_SYS_TYPES_H
+
+/* Define to 1 if you have the `uname' function. */
+#undef HAVE_UNAME
+
+/* Define to 1 if you have the <unistd.h> header file. */
+#undef HAVE_UNISTD_H
+
+/* Define to 1 if the system has the type `_Bool'. */
+#undef HAVE__BOOL
+
+/* Name of package */
+#undef PACKAGE
+
+/* Define to the address where bug reports for this package should be sent. */
+#undef PACKAGE_BUGREPORT
+
+/* Define to the full name of this package. */
+#undef PACKAGE_NAME
+
+/* Define to the full name and version of this package. */
+#undef PACKAGE_STRING
+
+/* Define to the one symbol short name of this package. */
+#undef PACKAGE_TARNAME
+
+/* Define to the home page for this package. */
+#undef PACKAGE_URL
+
+/* Define to the version of this package. */
+#undef PACKAGE_VERSION
+
+/* Define to 1 if you have the ANSI C header files. */
+#undef STDC_HEADERS
+
+/* Define to 1 if strerror_r returns char *. */
+#undef STRERROR_R_CHAR_P
+
+/* Enable extensions on AIX 3, Interix. */
+#ifndef _ALL_SOURCE
+# undef _ALL_SOURCE
+#endif
+/* Enable GNU extensions on systems that have them. */
+#ifndef _GNU_SOURCE
+# undef _GNU_SOURCE
+#endif
+/* Enable threading extensions on Solaris. */
+#ifndef _POSIX_PTHREAD_SEMANTICS
+# undef _POSIX_PTHREAD_SEMANTICS
+#endif
+/* Enable extensions on HP NonStop. */
+#ifndef _TANDEM_SOURCE
+# undef _TANDEM_SOURCE
+#endif
+/* Enable general extensions on Solaris. */
+#ifndef __EXTENSIONS__
+# undef __EXTENSIONS__
+#endif
+
+
+/* Version number of package */
+#undef VERSION
+
+/* Enable large inode numbers on Mac OS X 10.5. */
+#ifndef _DARWIN_USE_64_BIT_INODE
+# define _DARWIN_USE_64_BIT_INODE 1
+#endif
+
+/* Number of bits in a file offset, on hosts where this is settable. */
+#undef _FILE_OFFSET_BITS
+
+/* Define for large files, on AIX-style hosts. */
+#undef _LARGE_FILES
+
+/* Define to 1 if on MINIX. */
+#undef _MINIX
+
+/* Define to 2 if the system does not provide POSIX.1 features except with
+ this defined. */
+#undef _POSIX_1_SOURCE
+
+/* Define to 1 if you need to in order for `stat' and other things to work. */
+#undef _POSIX_SOURCE
+
+/* Define to empty if `const' does not conform to ANSI C. */
+#undef const
+
+/* Define to `long int' if <sys/types.h> does not define. */
+#undef off_t
+
+/* Define to `unsigned int' if <sys/types.h> does not define. */
+#undef size_t
Added: hama/trunk/c++/pipes/install-sh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/install-sh?rev=1465852&view=auto
==============================================================================
--- hama/trunk/c++/pipes/install-sh (added)
+++ hama/trunk/c++/pipes/install-sh Tue Apr 9 01:28:04 2013
@@ -0,0 +1,323 @@
+#!/bin/sh
+# install - install a program, script, or datafile
+
+scriptversion=2005-05-14.22
+
+# This originates from X11R5 (mit/util/scripts/install.sh), which was
+# later released in X11R6 (xc/config/util/install.sh) with the
+# following copyright and license.
+#
+# Copyright (C) 1994 X Consortium
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# X CONSORTIUM BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
+# AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNEC-
+# TION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+#
+# Except as contained in this notice, the name of the X Consortium shall not
+# be used in advertising or otherwise to promote the sale, use or other deal-
+# ings in this Software without prior written authorization from the X Consor-
+# tium.
+#
+#
+# FSF changes to this file are in the public domain.
+#
+# Calling this script install-sh is preferred over install.sh, to prevent
+# `make' implicit rules from creating a file called install from it
+# when there is no Makefile.
+#
+# This script is compatible with the BSD install script, but was written
+# from scratch. It can only install one file at a time, a restriction
+# shared with many OS's install programs.
+
+# set DOITPROG to echo to test this script
+
+# Don't use :- since 4.3BSD and earlier shells don't like it.
+doit="${DOITPROG-}"
+
+# put in absolute paths if you don't have them in your path; or use env. vars.
+
+mvprog="${MVPROG-mv}"
+cpprog="${CPPROG-cp}"
+chmodprog="${CHMODPROG-chmod}"
+chownprog="${CHOWNPROG-chown}"
+chgrpprog="${CHGRPPROG-chgrp}"
+stripprog="${STRIPPROG-strip}"
+rmprog="${RMPROG-rm}"
+mkdirprog="${MKDIRPROG-mkdir}"
+
+chmodcmd="$chmodprog 0755"
+chowncmd=
+chgrpcmd=
+stripcmd=
+rmcmd="$rmprog -f"
+mvcmd="$mvprog"
+src=
+dst=
+dir_arg=
+dstarg=
+no_target_directory=
+
+usage="Usage: $0 [OPTION]... [-T] SRCFILE DSTFILE
+ or: $0 [OPTION]... SRCFILES... DIRECTORY
+ or: $0 [OPTION]... -t DIRECTORY SRCFILES...
+ or: $0 [OPTION]... -d DIRECTORIES...
+
+In the 1st form, copy SRCFILE to DSTFILE.
+In the 2nd and 3rd, copy all SRCFILES to DIRECTORY.
+In the 4th, create DIRECTORIES.
+
+Options:
+-c (ignored)
+-d create directories instead of installing files.
+-g GROUP $chgrpprog installed files to GROUP.
+-m MODE $chmodprog installed files to MODE.
+-o USER $chownprog installed files to USER.
+-s $stripprog installed files.
+-t DIRECTORY install into DIRECTORY.
+-T report an error if DSTFILE is a directory.
+--help display this help and exit.
+--version display version info and exit.
+
+Environment variables override the default commands:
+ CHGRPPROG CHMODPROG CHOWNPROG CPPROG MKDIRPROG MVPROG RMPROG STRIPPROG
+"
+
+while test -n "$1"; do
+ case $1 in
+ -c) shift
+ continue;;
+
+ -d) dir_arg=true
+ shift
+ continue;;
+
+ -g) chgrpcmd="$chgrpprog $2"
+ shift
+ shift
+ continue;;
+
+ --help) echo "$usage"; exit $?;;
+
+ -m) chmodcmd="$chmodprog $2"
+ shift
+ shift
+ continue;;
+
+ -o) chowncmd="$chownprog $2"
+ shift
+ shift
+ continue;;
+
+ -s) stripcmd=$stripprog
+ shift
+ continue;;
+
+ -t) dstarg=$2
+ shift
+ shift
+ continue;;
+
+ -T) no_target_directory=true
+ shift
+ continue;;
+
+ --version) echo "$0 $scriptversion"; exit $?;;
+
+ *) # When -d is used, all remaining arguments are directories to create.
+ # When -t is used, the destination is already specified.
+ test -n "$dir_arg$dstarg" && break
+ # Otherwise, the last argument is the destination. Remove it from $@.
+ for arg
+ do
+ if test -n "$dstarg"; then
+ # $@ is not empty: it contains at least $arg.
+ set fnord "$@" "$dstarg"
+ shift # fnord
+ fi
+ shift # arg
+ dstarg=$arg
+ done
+ break;;
+ esac
+done
+
+if test -z "$1"; then
+ if test -z "$dir_arg"; then
+ echo "$0: no input file specified." >&2
+ exit 1
+ fi
+ # It's OK to call `install-sh -d' without argument.
+ # This can happen when creating conditional directories.
+ exit 0
+fi
+
+for src
+do
+ # Protect names starting with `-'.
+ case $src in
+ -*) src=./$src ;;
+ esac
+
+ if test -n "$dir_arg"; then
+ dst=$src
+ src=
+
+ if test -d "$dst"; then
+ mkdircmd=:
+ chmodcmd=
+ else
+ mkdircmd=$mkdirprog
+ fi
+ else
+ # Waiting for this to be detected by the "$cpprog $src $dsttmp" command
+ # might cause directories to be created, which would be especially bad
+ # if $src (and thus $dsttmp) contains '*'.
+ if test ! -f "$src" && test ! -d "$src"; then
+ echo "$0: $src does not exist." >&2
+ exit 1
+ fi
+
+ if test -z "$dstarg"; then
+ echo "$0: no destination specified." >&2
+ exit 1
+ fi
+
+ dst=$dstarg
+ # Protect names starting with `-'.
+ case $dst in
+ -*) dst=./$dst ;;
+ esac
+
+ # If destination is a directory, append the input filename; won't work
+ # if double slashes aren't ignored.
+ if test -d "$dst"; then
+ if test -n "$no_target_directory"; then
+ echo "$0: $dstarg: Is a directory" >&2
+ exit 1
+ fi
+ dst=$dst/`basename "$src"`
+ fi
+ fi
+
+ # This sed command emulates the dirname command.
+ dstdir=`echo "$dst" | sed -e 's,/*$,,;s,[^/]*$,,;s,/*$,,;s,^$,.,'`
+
+ # Make sure that the destination directory exists.
+
+ # Skip lots of stat calls in the usual case.
+ if test ! -d "$dstdir"; then
+ defaultIFS='
+ '
+ IFS="${IFS-$defaultIFS}"
+
+ oIFS=$IFS
+ # Some sh's can't handle IFS=/ for some reason.
+ IFS='%'
+ set x `echo "$dstdir" | sed -e 's@/@%@g' -e 's@^%@/@'`
+ shift
+ IFS=$oIFS
+
+ pathcomp=
+
+ while test $# -ne 0 ; do
+ pathcomp=$pathcomp$1
+ shift
+ if test ! -d "$pathcomp"; then
+ $mkdirprog "$pathcomp"
+ # mkdir can fail with a `File exist' error in case several
+ # install-sh are creating the directory concurrently. This
+ # is OK.
+ test -d "$pathcomp" || exit
+ fi
+ pathcomp=$pathcomp/
+ done
+ fi
+
+ if test -n "$dir_arg"; then
+ $doit $mkdircmd "$dst" \
+ && { test -z "$chowncmd" || $doit $chowncmd "$dst"; } \
+ && { test -z "$chgrpcmd" || $doit $chgrpcmd "$dst"; } \
+ && { test -z "$stripcmd" || $doit $stripcmd "$dst"; } \
+ && { test -z "$chmodcmd" || $doit $chmodcmd "$dst"; }
+
+ else
+ dstfile=`basename "$dst"`
+
+ # Make a couple of temp file names in the proper directory.
+ dsttmp=$dstdir/_inst.$$_
+ rmtmp=$dstdir/_rm.$$_
+
+ # Trap to clean up those temp files at exit.
+ trap 'ret=$?; rm -f "$dsttmp" "$rmtmp" && exit $ret' 0
+ trap '(exit $?); exit' 1 2 13 15
+
+ # Copy the file name to the temp name.
+ $doit $cpprog "$src" "$dsttmp" &&
+
+ # and set any options; do chmod last to preserve setuid bits.
+ #
+ # If any of these fail, we abort the whole thing. If we want to
+ # ignore errors from any of these, just make sure not to ignore
+ # errors from the above "$doit $cpprog $src $dsttmp" command.
+ #
+ { test -z "$chowncmd" || $doit $chowncmd "$dsttmp"; } \
+ && { test -z "$chgrpcmd" || $doit $chgrpcmd "$dsttmp"; } \
+ && { test -z "$stripcmd" || $doit $stripcmd "$dsttmp"; } \
+ && { test -z "$chmodcmd" || $doit $chmodcmd "$dsttmp"; } &&
+
+ # Now rename the file to the real destination.
+ { $doit $mvcmd -f "$dsttmp" "$dstdir/$dstfile" 2>/dev/null \
+ || {
+ # The rename failed, perhaps because mv can't rename something else
+ # to itself, or perhaps because mv is so ancient that it does not
+ # support -f.
+
+ # Now remove or move aside any old file at destination location.
+ # We try this two ways since rm can't unlink itself on some
+ # systems and the destination file might be busy for other
+ # reasons. In this case, the final cleanup might fail but the new
+ # file should still install successfully.
+ {
+ if test -f "$dstdir/$dstfile"; then
+ $doit $rmcmd -f "$dstdir/$dstfile" 2>/dev/null \
+ || $doit $mvcmd -f "$dstdir/$dstfile" "$rmtmp" 2>/dev/null \
+ || {
+ echo "$0: cannot unlink or rename $dstdir/$dstfile" >&2
+ (exit 1); exit 1
+ }
+ else
+ :
+ fi
+ } &&
+
+ # Now rename the file to the real destination.
+ $doit $mvcmd "$dsttmp" "$dstdir/$dstfile"
+ }
+ }
+ fi || { (exit 1); exit 1; }
+done
+
+# The final little trick to "correctly" pass the exit status to the exit trap.
+{
+ (exit 0); exit 0
+}
+
+# Local variables:
+# eval: (add-hook 'write-file-hooks 'time-stamp)
+# time-stamp-start: "scriptversion="
+# time-stamp-format: "%:y-%02m-%02d.%02H"
+# time-stamp-end: "$"
+# End: