You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/17 19:45:01 UTC
svn commit: r1611413 [2/18] - in
/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client:
./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nativ...
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/platform/custom/CustomPlatform.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/platform/custom/CustomPlatform.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/platform/custom/CustomPlatform.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/platform/custom/CustomPlatform.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.nativetask.platform.custom;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.nativetask.Platform;
+import org.apache.hadoop.nativetask.serde.custom.CustomWritable;
+import org.apache.hadoop.nativetask.serde.custom.CustomWritableSerializer;
+
+public class CustomPlatform extends Platform{
+
+ @Override
+ public void init() throws IOException {
+ registerKey(CustomWritable.class.getName(),
+ CustomWritableSerializer.class);
+ }
+
+ @Override
+ public String name() {
+ return "CustomPlatform";
+ }
+
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritable.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritable.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritable.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.nativetask.serde.custom;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public class CustomWritable implements WritableComparable<CustomWritable>{
+
+ private int Id_a;
+ private long Id_b;
+
+ public CustomWritable() {
+ this.Id_a = 0;
+ this.Id_b = 0;
+ }
+
+ public CustomWritable(int a, long b){
+ this.Id_a = a;
+ this.Id_b = b;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ Id_a = in.readInt();
+ Id_b = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(Id_a);
+ out.writeLong(Id_b);
+ }
+
+ @Override
+ public int compareTo(CustomWritable that) {
+ if(Id_a > that.Id_a){
+ return 1;
+ }
+ if(Id_a < that.Id_a){
+ return -1;
+ }
+ if(Id_b > that.Id_b){
+ return 1;
+ }
+ if(Id_b < that.Id_b){
+ return -1;
+ }
+ return 0;
+ }
+
+ @Override
+ public String toString() {
+ return Id_a + "\t" + Id_b;
+ }
+
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritableSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritableSerializer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritableSerializer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritableSerializer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.nativetask.serde.custom;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+import org.apache.hadoop.mapred.nativetask.serde.DefaultSerializer;
+
+public class CustomWritableSerializer extends DefaultSerializer implements
+ INativeComparable{
+
+ @Override
+ public int getLength(Writable w) throws IOException {
+ return 12;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/native/src/CustomComparator.cpp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/native/src/CustomComparator.cpp?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/native/src/CustomComparator.cpp (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/native/src/CustomComparator.cpp Thu Jul 17 17:44:55 2014
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+#include "NativeTask.h"
+
+using namespace NativeTask;
+
+namespace Custom {
+using namespace std;
+
+inline uint32_t bswap(uint32_t val) {
+ __asm__("bswap %0" : "=r" (val) : "0" (val));
+ return val;
+}
+
+int32_t ReadInt(const char * src) {
+ return (int32_t) bswap(*(uint32_t*) src);
+}
+
+inline uint64_t bswap64(uint64_t val) {
+#ifdef __X64
+ __asm__("bswapq %0" : "=r" (val) : "0" (val));
+#else
+
+ uint64_t lower = val & 0xffffffffU;
+ uint32_t higher = (val >> 32) & 0xffffffffU;
+
+ lower = bswap(lower);
+ higher = bswap(higher);
+
+ return (lower << 32) + higher;
+
+#endif
+ return val;
+}
+
+int64_t ReadLong(const char * src) {
+ return (int64_t) bswap64(*(uint64_t*) src);
+}
+
+int CustomComparator(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength) {
+ int32_t src_IDa = ReadInt(src);
+ int64_t src_IDb = ReadLong(src+4);
+ int32_t dest_IDa = ReadInt(dest);
+ int64_t dest_IDb = ReadLong(dest+4);
+ if(src_IDa > dest_IDa){
+ return 1;
+ }
+ if(src_IDa < dest_IDa){
+ return -1;
+ }
+ if(src_IDb > dest_IDb){
+ return 1;
+ }
+ if(src_IDb < dest_IDb){
+ return -1;
+ }
+ return 0;
+};
+
+DEFINE_NATIVE_LIBRARY(Custom) {
+ REGISTER_FUNCTION(CustomComparator,Custom);
+}
+
+}
+
+
+
+
+
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/CMakeLists.txt?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/CMakeLists.txt (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/CMakeLists.txt Thu Jul 17 17:44:55 2014
@@ -0,0 +1,279 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
+
+# Default to release builds
+set(CMAKE_BUILD_TYPE, Release)
+
+include(JNIFlags.cmake NO_POLICY_SCOPE)
+
+# Compile a library with both shared and static variants
+function(add_dual_library LIBNAME)
+ add_library(${LIBNAME} SHARED ${ARGN})
+ add_library(${LIBNAME}_static STATIC ${ARGN})
+ set_target_properties(${LIBNAME}_static PROPERTIES OUTPUT_NAME ${LIBNAME})
+endfunction(add_dual_library)
+
+# Link both a static and a dynamic target against some libraries
+function(target_link_dual_libraries LIBNAME)
+ target_link_libraries(${LIBNAME} ${ARGN})
+ target_link_libraries(${LIBNAME}_static ${ARGN})
+endfunction(target_link_dual_libraries)
+
+function(output_directory TGT DIR)
+ SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+ RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+ SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+ ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+ SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+ LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+endfunction(output_directory TGT DIR)
+
+function(dual_output_directory TGT DIR)
+ output_directory(${TGT} "${DIR}")
+ output_directory(${TGT}_static "${DIR}")
+endfunction(dual_output_directory TGT DIR)
+
+#
+# This macro alters the behavior of find_package and find_library.
+# It does this by setting the CMAKE_FIND_LIBRARY_SUFFIXES global variable.
+# You should save that variable before calling this function and restore it
+# after you have accomplished your goal.
+#
+# The behavior is altered in two ways:
+# 1. We always find shared libraries, never static;
+# 2. We find shared libraries with the given version number.
+#
+# On Windows this function is a no-op. Windows does not encode
+# version number information information into library path names.
+#
+macro(set_find_shared_library_version LVERS)
+ IF(${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
+ # Mac OS uses .dylib
+ SET(CMAKE_FIND_LIBRARY_SUFFIXES ".${LVERS}.dylib")
+ ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
+ # FreeBSD has always .so installed.
+ SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so")
+ ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "Windows")
+ # Windows doesn't support finding shared libraries by version.
+ ELSE()
+ # Most UNIX variants use .so
+ SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so.${LVERS}")
+ ENDIF()
+endmacro(set_find_shared_library_version LVERS)
+
+if (NOT GENERATED_JAVAH)
+ #Must identify where the generated headers have been placed
+ MESSAGE(FATAL_ERROR "You must set the cmake variable GENERATED_JAVAH")
+endif (NOT GENERATED_JAVAH)
+find_package(JNI REQUIRED)
+
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
+set_find_shared_library_version("1")
+SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
+
+# primitive configs
+set(PRFLAGS "-DSIMPLE_MEMCPY")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${PRFLAGS} -Wall")
+set(CMAKE_LD_FLAGS "${CMAKE_LD_FLAGS} -no-undefined -version-info 0:1:0
+ -L${_JAVA_HOME}/jre/lib/amd64/server -ljvm")
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CMAKE_C_FLAGS} -g -O2 -DNDEBUG -fPIC")
+set(D main/native/)
+
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
+set_find_shared_library_version("1")
+SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
+
+INCLUDE(CheckFunctionExists)
+INCLUDE(CheckCSourceCompiles)
+#INCLUDE(CheckLibraryExists)
+INCLUDE(CheckIncludeFiles)
+#CHECK_FUNCTION_EXISTS(sync_file_range HAVE_SYNC_FILE_RANGE)
+#CHECK_FUNCTION_EXISTS(posix_fadvise HAVE_POSIX_FADVISE)
+#CHECK_LIBRARY_EXISTS(dl dlopen "" NEED_LINK_DL)
+CHECK_INCLUDE_FILES(fcntl.h HAVE_FCNTL_H)
+CHECK_INCLUDE_FILES(malloc.h HAVE_MALLOC_H)
+CHECK_INCLUDE_FILES(mach/mach.h HAVE_MACH_MACH_H)
+CHECK_INCLUDE_FILES(memory.h HAVE_MEMORY_H)
+CHECK_INCLUDE_FILES(stddef.h HAVE_STDDEF_H)
+CHECK_INCLUDE_FILES(stdint.h HAVE_STDINT_H)
+CHECK_INCLUDE_FILES(stdlib.h HAVE_STDLIB_H)
+CHECK_INCLUDE_FILES(string.h HAVE_STRING_H)
+CHECK_INCLUDE_FILES(unistd.h HAVE_UNITSTD_H)
+CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
+CHECK_FUNCTION_EXISTS(localtime_r HAVE_LOCALTIME_R)
+CHECK_FUNCTION_EXISTS(memset HAVE_MEMSET)
+CHECK_FUNCTION_EXISTS(strchr HAVE_STRCHR)
+CHECK_FUNCTION_EXISTS(strtoul HAVE_STRTOUL)
+
+
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
+set_find_shared_library_version("1")
+find_library(SNAPPY_LIBRARY
+ NAMES snappy
+ PATHS ${CUSTOM_SNAPPY_PREFIX} ${CUSTOM_SNAPPY_PREFIX}/lib
+ ${CUSTOM_SNAPPY_PREFIX}/lib64 ${CUSTOM_SNAPPY_LIB})
+SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
+find_path(SNAPPY_INCLUDE_DIR
+ NAMES snappy.h
+ PATHS ${CUSTOM_SNAPPY_PREFIX} ${CUSTOM_SNAPPY_PREFIX}/include
+ ${CUSTOM_SNAPPY_INCLUDE})
+if (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
+ GET_FILENAME_COMPONENT(HADOOP_SNAPPY_LIBRARY ${SNAPPY_LIBRARY} NAME)
+ set(SNAPPY_SOURCE_FILES
+ "${D}/src/codec/SnappyCodec.cc")
+else (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
+ set(SNAPPY_INCLUDE_DIR "")
+ set(SNAPPY_SOURCE_FILES "")
+ IF(REQUIRE_SNAPPY)
+ MESSAGE(FATAL_ERROR "Required snappy library could not be found. SNAPPY_LIBRARY=${SNAPPY_LIBRARY}, SNAPPY_INCLUDE_DIR=${SNAPPY_INCLUDE_DIR}, CUSTOM_SNAPPY_INCLUDE_DIR=${CUSTOM_SNAPPY_INCLUDE_DIR}, CUSTOM_SNAPPY_PREFIX=${CUSTOM_SNAPPY_PREFIX}, CUSTOM_SNAPPY_INCLUDE=${CUSTOM_SNAPPY_INCLUDE}")
+ ENDIF(REQUIRE_SNAPPY)
+endif (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
+
+include_directories(
+ ${GENERATED_JAVAH}
+ ${D}
+ ${D}/src
+ ${D}/src/util
+ ${D}/src/lib
+ ${D}/test
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ #${CMAKE_CURRENT_SOURCE_DIR}/src
+ #${CMAKE_BINARY_DIR}
+ ${JNI_INCLUDE_DIRS}
+ ${SNAPPY_INCLUDE_DIR}
+)
+
+#SET(CMAKE_SOURCE_DIR "/cygdrive/c/Users/tianlunz/repo/hadoop-2.2.0-src/hadoop-common-project/hadoop-common/src")
+CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
+
+
+SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
+
+add_dual_library(nativetask
+ ${D}/lz4/lz4.c
+ ${D}/cityhash/city.cc
+ ${D}/src/codec/BlockCodec.cc
+ ${D}/src/codec/GzipCodec.cc
+ ${D}/src/codec/Lz4Codec.cc
+ ${SNAPPY_SOURCE_FILES}
+ ${D}/src/handler/BatchHandler.cc
+ ${D}/src/handler/MCollectorOutputHandler.cc
+ ${D}/src/handler/AbstractMapHandler.cc
+ ${D}/src/handler/CombineHandler.cc
+ ${D}/src/lib/Buffers.cc
+ ${D}/src/lib/BufferStream.cc
+ ${D}/src/lib/Compressions.cc
+ ${D}/src/lib/PartitionBucket.cc
+ ${D}/src/lib/PartitionBucketIterator.cc
+ ${D}/src/lib/FileSystem.cc
+ ${D}/src/lib/IFile.cc
+ ${D}/src/lib/jniutils.cc
+ ${D}/src/lib/Log.cc
+ ${D}/src/lib/MapOutputCollector.cc
+ ${D}/src/lib/MapOutputSpec.cc
+ ${D}/src/lib/MemoryBlock.cc
+ ${D}/src/lib/Merge.cc
+ ${D}/src/lib/NativeLibrary.cc
+ ${D}/src/lib/Iterator.cc
+ ${D}/src/lib/NativeObjectFactory.cc
+ ${D}/src/lib/NativeRuntimeJniImpl.cc
+ ${D}/src/lib/NativeTask.cc
+ ${D}/src/lib/SpillInfo.cc
+ ${D}/src/lib/Path.cc
+ ${D}/src/lib/Streams.cc
+ ${D}/src/lib/Combiner.cc
+ ${D}/src/lib/TaskCounters.cc
+ ${D}/src/util/Checksum.cc
+ ${D}/src/util/Hash.cc
+ ${D}/src/util/Random.cc
+ ${D}/src/util/StringUtil.cc
+ ${D}/src/util/SyncUtils.cc
+ ${D}/src/util/Timer.cc
+ ${D}/src/util/WritableUtils.cc
+)
+target_link_libraries(nativetask
+ #${LIB_DL}
+ dl
+ rt
+ pthread
+ z
+ ${SNAPPY_LIBRARY}
+ ${JAVA_JVM_LIBRARY}
+)
+
+add_executable(nttest
+ ${D}/gtest/gtest-all.cc
+ ${D}/test/lib/TestByteArray.cc
+ ${D}/test/lib/TestByteBuffer.cc
+ ${D}/test/lib/TestComparatorForDualPivotQuickSort.cc
+ ${D}/test/lib/TestComparatorForStdSort.cc
+ ${D}/test/lib/TestFixSizeContainer.cc
+ ${D}/test/lib/TestMemoryPool.cc
+ ${D}/test/lib/TestIterator.cc
+ ${D}/test/lib/TestKVBuffer.cc
+ ${D}/test/lib/TestMemBlockIterator.cc
+ ${D}/test/lib/TestMemoryBlock.cc
+ ${D}/test/lib/TestPartitionBucket.cc
+ ${D}/test/lib/TestReadBuffer.cc
+ ${D}/test/lib/TestReadWriteBuffer.cc
+ ${D}/test/lib/TestTrackingCollector.cc
+ ${D}/test/util/TestChecksum.cc
+ ${D}/test/util/TestHash.cc
+ ${D}/test/util/TestStringUtil.cc
+ ${D}/test/util/TestSyncUtils.cc
+ ${D}/test/util/TestWritableUtils.cc
+ ${D}/test/TestCommand.cc
+ ${D}/test/TestConfig.cc
+ ${D}/test/TestCounter.cc
+ ${D}/test/TestCompressions.cc
+ ${D}/test/TestFileSystem.cc
+ ${D}/test/TestIFile.cc
+ ${D}/test/TestPrimitives.cc
+ ${D}/test/TestSort.cc
+ ${D}/test/TestMain.cc
+ ${D}/test/test_commons.cc)
+set(CMAKE_EXE_LINKER_FLAGS "-L${_JAVA_HOME}/jre/lib/amd64/server -ljvm")
+target_link_libraries(nttest
+ nativetask_static
+ dl
+ rt
+ pthread
+ z
+ ${SNAPPY_LIBRARY}
+ # ${JAVA_JVM_LIBRARY}
+)
+#if (NEED_LINK_DL)
+# set(LIB_DL dl)
+#endif (NEED_LINK_DL)
+
+IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
+ #
+ # By embedding '$ORIGIN' into the RPATH of libnativetask.so,
+ # dlopen will look in the directory containing libnativetask.so.
+ # However, $ORIGIN is not supported by all operating systems.
+ #
+ SET_TARGET_PROPERTIES(nativetask
+ PROPERTIES INSTALL_RPATH "\$ORIGIN/")
+ENDIF()
+
+SET(LIBNATIVETASK_VERSION "1.0.0")
+SET_TARGET_PROPERTIES(nativetask PROPERTIES SOVERSION ${LIBNATIVETASK_VERSION})
+dual_output_directory(nativetask target/usr/local/lib)
+output_directory(nttest test)
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/JNIFlags.cmake
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/JNIFlags.cmake?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/JNIFlags.cmake (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/JNIFlags.cmake Thu Jul 17 17:44:55 2014
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
+
+# If JVM_ARCH_DATA_MODEL is 32, compile all binaries as 32-bit.
+# This variable is set by maven.
+if (JVM_ARCH_DATA_MODEL EQUAL 32)
+ # Force 32-bit code generation on amd64/x86_64, ppc64, sparc64
+ if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_SYSTEM_PROCESSOR MATCHES ".*64")
+ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m32")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -m32")
+ set(CMAKE_LD_FLAGS "${CMAKE_LD_FLAGS} -m32")
+ endif ()
+ if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
+ # Set CMAKE_SYSTEM_PROCESSOR to ensure that find_package(JNI) will use
+ # the 32-bit version of libjvm.so.
+ set(CMAKE_SYSTEM_PROCESSOR "i686")
+ endif ()
+endif (JVM_ARCH_DATA_MODEL EQUAL 32)
+
+# Determine float ABI of JVM on ARM Linux
+if (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")
+ find_program(READELF readelf)
+ if (READELF MATCHES "NOTFOUND")
+ message(WARNING "readelf not found; JVM float ABI detection disabled")
+ else (READELF MATCHES "NOTFOUND")
+ execute_process(
+ COMMAND ${READELF} -A ${JAVA_JVM_LIBRARY}
+ OUTPUT_VARIABLE JVM_ELF_ARCH
+ ERROR_QUIET)
+ if (NOT JVM_ELF_ARCH MATCHES "Tag_ABI_VFP_args: VFP registers")
+ message("Soft-float JVM detected")
+
+ # Test compilation with -mfloat-abi=softfp using an arbitrary libc function
+ # (typically fails with "fatal error: bits/predefs.h: No such file or directory"
+ # if soft-float dev libraries are not installed)
+ include(CMakePushCheckState)
+ cmake_push_check_state()
+ set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -mfloat-abi=softfp")
+ include(CheckSymbolExists)
+ check_symbol_exists(exit stdlib.h SOFTFP_AVAILABLE)
+ if (NOT SOFTFP_AVAILABLE)
+ message(FATAL_ERROR "Soft-float dev libraries required (e.g. 'apt-get install libc6-dev-armel' on Debian/Ubuntu)")
+ endif (NOT SOFTFP_AVAILABLE)
+ cmake_pop_check_state()
+
+ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfloat-abi=softfp")
+ endif ()
+ endif (READELF MATCHES "NOTFOUND")
+endif (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")
+
+IF("${CMAKE_SYSTEM}" MATCHES "Linux")
+ #
+ # Locate JNI_INCLUDE_DIRS and JNI_LIBRARIES.
+ # Since we were invoked from Maven, we know that the JAVA_HOME environment
+ # variable is valid. So we ignore system paths here and just use JAVA_HOME.
+ #
+ FILE(TO_CMAKE_PATH "$ENV{JAVA_HOME}" _JAVA_HOME)
+ IF(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$")
+ SET(_java_libarch "i386")
+ ELSEIF (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
+ SET(_java_libarch "amd64")
+ ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm")
+ SET(_java_libarch "arm")
+ ELSE()
+ SET(_java_libarch ${CMAKE_SYSTEM_PROCESSOR})
+ ENDIF()
+ SET(_JDK_DIRS "${_JAVA_HOME}/jre/lib/${_java_libarch}/*"
+ "${_JAVA_HOME}/jre/lib/${_java_libarch}"
+ "${_JAVA_HOME}/jre/lib/*"
+ "${_JAVA_HOME}/jre/lib"
+ "${_JAVA_HOME}/lib/*"
+ "${_JAVA_HOME}/lib"
+ "${_JAVA_HOME}/include/*"
+ "${_JAVA_HOME}/include"
+ "${_JAVA_HOME}"
+ )
+ FIND_PATH(JAVA_INCLUDE_PATH
+ NAMES jni.h
+ PATHS ${_JDK_DIRS}
+ NO_DEFAULT_PATH)
+ #In IBM java, it's jniport.h instead of jni_md.h
+ FIND_PATH(JAVA_INCLUDE_PATH2
+ NAMES jni_md.h jniport.h
+ PATHS ${_JDK_DIRS}
+ NO_DEFAULT_PATH)
+ SET(JNI_INCLUDE_DIRS ${JAVA_INCLUDE_PATH} ${JAVA_INCLUDE_PATH2})
+ FIND_LIBRARY(JAVA_JVM_LIBRARY
+ NAMES rt jvm
+ PATHS ${_JDK_DIRS}
+ NO_DEFAULT_PATH)
+ SET(JNI_LIBRARIES ${JAVA_JVM_LIBRARY})
+ MESSAGE("JAVA_HOME=${JAVA_HOME}, JAVA_JVM_LIBRARY=${JAVA_JVM_LIBRARY}")
+ MESSAGE("JAVA_INCLUDE_PATH=${JAVA_INCLUDE_PATH}, JAVA_INCLUDE_PATH2=${JAVA_INCLUDE_PATH2}")
+ IF(JAVA_JVM_LIBRARY AND JAVA_INCLUDE_PATH AND JAVA_INCLUDE_PATH2)
+ MESSAGE("Located all JNI components successfully.")
+ ELSE()
+ MESSAGE(FATAL_ERROR "Failed to find a viable JVM installation under JAVA_HOME.")
+ ENDIF()
+ELSE()
+ find_package(JNI REQUIRED)
+ENDIF()
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/config.h.cmake
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/config.h.cmake?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/config.h.cmake (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/config.h.cmake Thu Jul 17 17:44:55 2014
@@ -0,0 +1,23 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef CONFIG_H
+#define CONFIG_H
+
+#cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@"
+
+#endif
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Command.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Command.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Command.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Command.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+public class Command {
+
+ private int id;
+ private String description;
+
+ public Command(int id) {
+ this.id = id;
+ }
+
+ public Command(int id, String description) {
+ this.id = id;
+ this.description = description;
+ }
+
+ public int id() {
+ return this.id;
+ }
+
+ public String description() {
+ return this.description;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof Command) {
+ return this.id == ((Command)other).id;
+ }
+ return false;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/CommandDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/CommandDispatcher.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/CommandDispatcher.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/CommandDispatcher.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+
+/**
+ * a CommandDispatcher receives {@link Command} from upstream
+ * and performs corresponding operations
+ */
+public interface CommandDispatcher {
+
+ /**
+ *
+ * @param command
+ * @param parameter
+ * @return
+ * @throws IOException
+ */
+ public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException;
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Constants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Constants.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Constants.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Constants.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+public class Constants {
+
+ public static final String MAP_SORT_CLASS = "map.sort.class";
+ public static final String MAPRED_COMBINER_CLASS = "mapred.combiner.class";
+
+ public static final String MAPRED_MAPTASK_DELEGATOR_CLASS = "mapreduce.map.task.delegator.class";
+ public static final String MAPRED_REDUCETASK_DELEGATOR_CLASS = "mapreduce.reduce.task.delegator.class";
+ public static final String NATIVE_TASK_ENABLED = "native.task.enabled";
+ public static final String NATIVE_LOG_DEVICE = "native.log.device";
+ public static final String NATIVE_HADOOP_VERSION = "native.hadoop.version";
+
+ public static final String NATIVE_MAPPER_CLASS = "native.mapper.class";
+ public static final String NATIVE_REDUCER_CLASS = "native.reducer.class";
+ public static final String NATIVE_PARTITIONER_CLASS = "native.partitioner.class";
+ public static final String NATIVE_COMBINER_CLASS = "native.combiner.class";
+ public static final String NATIVE_INPUT_SPLIT = "native.input.split";
+
+ public static final String NATIVE_RECORDREADER_CLASS = "native.recordreader.class";
+ public static final String NATIVE_RECORDWRITER_CLASS = "native.recordwriter.class";
+ public static final String NATIVE_OUTPUT_FILE_NAME = "native.output.file.name";
+
+ public static final String NATIVE_PROCESSOR_BUFFER_KB = "native.processor.buffer.kb";
+ public static int NATIVE_PROCESSOR_BUFFER_KB_DEFAULT = 64;
+ public static int NATIVE_ASYNC_PROCESSOR_BUFFER_KB_DEFAULT = 1024;
+
+ public static final String NATIVE_STATUS_UPDATE_INTERVAL = "native.update.interval";
+ public static int NATIVE_STATUS_UPDATE_INTERVAL_DEFVAL = 3000;
+
+ public static final String SERIALIZATION_FRAMEWORK = "SerializationFramework";
+ public static int SIZEOF_PARTITION_LENGTH = 4;
+ public static int SIZEOF_KEY_LENGTH = 4;
+ public static int SIZEOF_VALUE_LENGTH = 4;
+ public static int SIZEOF_KV_LENGTH = SIZEOF_KEY_LENGTH + SIZEOF_VALUE_LENGTH;
+
+ public static final String NATIVE_CLASS_LIBRARY = "native.class.library";
+ public static final String NATIVE_CLASS_LIBRARY_CUSTOM = "native.class.library.custom";
+ public static final String NATIVE_CLASS_LIBRARY_BUILDIN = "native.class.library.buildin";
+ public static final String NATIVE_MAPOUT_KEY_COMPARATOR = "native.map.output.key.comparator";
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+public enum DataChannel {
+ /**
+ * We will only read data from this channel
+ */
+ IN,
+ /**
+ * We will only write data from this channel
+ */
+ OUT,
+ /**
+ * We will do both read and write for this channel
+ */
+ INOUT,
+ /**
+ * There is no data exchange
+ */
+ NONE
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+/**
+ * a DataReceiver pulls in arriving data, an example
+ * is {@link org.apache.hadoop.mapred.nativetask.handlers.BufferPuller}
+ */
+public interface DataReceiver {
+
+ /**
+ * Send a signal to the receiver that the data arrives.
+ * The data is transferred in another band.
+ *
+ * @return
+ * @throws IOException
+ */
+ public boolean receiveData() throws IOException;
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.nativetask.serde.*;
+import org.apache.log4j.Logger;
+
+public class HadoopPlatform extends Platform {
+ private static final Logger LOG = Logger.getLogger(HadoopPlatform.class);
+
+ public HadoopPlatform() throws IOException {
+ }
+
+ @Override
+ public void init() throws IOException {
+ registerKey(NullWritable.class.getName(), NullWritableSerializer.class);
+ registerKey(Text.class.getName(), TextSerializer.class);
+ registerKey(LongWritable.class.getName(), LongWritableSerializer.class);
+ registerKey(IntWritable.class.getName(), IntWritableSerializer.class);
+ registerKey(Writable.class.getName(), DefaultSerializer.class);
+ registerKey(BytesWritable.class.getName(), BytesWritableSerializer.class);
+ registerKey(BooleanWritable.class.getName(), BoolWritableSerializer.class);
+ registerKey(ByteWritable.class.getName(), ByteWritableSerializer.class);
+ registerKey(FloatWritable.class.getName(), FloatWritableSerializer.class);
+ registerKey(DoubleWritable.class.getName(), DoubleWritableSerializer.class);
+ registerKey(VIntWritable.class.getName(), VIntWritableSerializer.class);
+ registerKey(VLongWritable.class.getName(), VLongWritableSerializer.class);
+
+ LOG.info("Hadoop platform inited");
+ }
+
+ @Override
+ public boolean support(String keyClassName, INativeSerializer serializer, JobConf job) {
+ if (keyClassNames.contains(keyClassName)
+ && serializer instanceof INativeComparable) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean define(Class comparatorClass) {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return "Hadoop";
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+/**
+ * interacts with native side to support Java Combiner
+ */
+public interface ICombineHandler {
+
+ /**
+ * run combiner
+ * @throws IOException
+ */
+ public void combine() throws IOException;
+
+ /**
+ * @return id of this handler
+ */
+ public long getId();
+
+ /**
+ * close handlers, buffer pullers and pushers
+ * @throws IOException
+ */
+ public void close() throws IOException;
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+/**
+ *
+ * Any key type that is comparable at native side must implement this interface
+ *
+ * a native comparator function should have the ComparatorPtr type
+ *
+ * typedef int (*ComparatorPtr)(const char * src, uint32_t srcLength,
+ * const char * dest, uint32_t destLength);
+ *
+ * keys are in serialized format at native side. The function has passed in
+ * the keys' locations and lengths such that we can compare them in the same
+ * logic as their Java comparator
+ *
+ *
+ * For example, a HiveKey {@see HiveKey#write} is serialized as
+ * int field (containing the length of raw bytes) + raw bytes
+ * When comparing two HiveKeys, we firstly read the length field and then
+ * comparing the raw bytes invoking the BytesComparator provided by our library.
+ * We pass the location and length of raw bytes into BytesComparator
+ *
+ * int HivePlatform::HiveKeyComparator(const char * src, uint32_t srcLength,
+ * const char * dest, uint32_t destLength) {
+ * uint32_t sl = bswap(*(uint32_t*)src);
+ * uint32_t dl = bswap(*(uint32_t*)dest);
+ * return NativeObjectFactory::BytesComparator(src + 4, sl, dest + 4, dl);
+ * }
+ */
+public interface INativeComparable {
+
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+
+/**
+ * A Handler accept input, and give output can be used to transfer command and data
+ */
+public interface INativeHandler extends NativeDataTarget, NativeDataSource {
+
+ public String name();
+
+ public long getNativeHandler();
+
+ /**
+ * init the native handler
+ */
+ public void init(Configuration conf) throws IOException;
+
+ /**
+ * close the native handler
+ */
+ public void close() throws IOException;
+
+ /**
+ * call command to downstream
+ *
+ * @param command
+ * @param parameter
+ * @return
+ * @throws IOException
+ */
+ public ReadWriteBuffer call(Command command, ReadWriteBuffer parameter) throws IOException;
+
+ /**
+ * @param handler
+ */
+ void setCommandDispatcher(CommandDispatcher handler);
+
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.DirectBufferPool;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.apache.hadoop.mapred.nativetask.util.ConfigUtil;
+
+/**
+ * used to create channel, transfer data and command between Java and native
+ */
+public class NativeBatchProcessor implements INativeHandler {
+ private static Log LOG = LogFactory.getLog(NativeBatchProcessor.class);
+
+ private final String nativeHandlerName;
+ private long nativeHandlerAddr;
+
+ private boolean isInputFinished = false;
+
+ // << Field used directly in Native, the name must NOT be changed
+ private ByteBuffer rawOutputBuffer;
+ private ByteBuffer rawInputBuffer;
+ // >>
+
+ private InputBuffer in;
+ private OutputBuffer out;
+
+ private CommandDispatcher commandDispatcher;
+ private DataReceiver dataReceiver;
+
+ static {
+ if (NativeRuntime.isNativeLibraryLoaded()) {
+ InitIDs();
+ }
+ }
+
+ public static INativeHandler create(String nativeHandlerName,
+ Configuration conf, DataChannel channel) throws IOException {
+
+ final int bufferSize = conf.getInt(Constants.NATIVE_PROCESSOR_BUFFER_KB,
+ 1024) * 1024;
+
+ LOG.info("NativeHandler: direct buffer size: " + bufferSize);
+
+ OutputBuffer out = null;
+ InputBuffer in = null;
+
+ switch (channel) {
+ case IN:
+ in = new InputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
+ break;
+ case OUT:
+ out = new OutputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
+ break;
+ case INOUT:
+ in = new InputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
+ out = new OutputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
+ break;
+ case NONE:
+ }
+
+ final INativeHandler handler = new NativeBatchProcessor(nativeHandlerName,
+ in, out);
+ handler.init(conf);
+ return handler;
+ }
+
+ protected NativeBatchProcessor(String nativeHandlerName, InputBuffer input,
+ OutputBuffer output) throws IOException {
+ this.nativeHandlerName = nativeHandlerName;
+
+ if (null != input) {
+ this.in = input;
+ this.rawInputBuffer = input.getByteBuffer();
+ }
+ if (null != output) {
+ this.out = output;
+ this.rawOutputBuffer = output.getByteBuffer();
+ }
+ }
+
+ @Override
+ public void setCommandDispatcher(CommandDispatcher handler) {
+ this.commandDispatcher = handler;
+ }
+
+ @Override
+ public void init(Configuration conf) throws IOException {
+ this.nativeHandlerAddr = NativeRuntime
+ .createNativeObject(nativeHandlerName);
+ if (this.nativeHandlerAddr == 0) {
+ throw new RuntimeException("Native object create failed, class: "
+ + nativeHandlerName);
+ }
+ setupHandler(nativeHandlerAddr, ConfigUtil.toBytes(conf));
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (nativeHandlerAddr != 0) {
+ NativeRuntime.releaseNativeObject(nativeHandlerAddr);
+ nativeHandlerAddr = 0;
+ }
+ if (null != in && null != in.getByteBuffer() && in.getByteBuffer().isDirect()) {
+ DirectBufferPool.getInstance().returnBuffer(in.getByteBuffer());
+ }
+ }
+
+ @Override
+ public long getNativeHandler() {
+ return nativeHandlerAddr;
+ }
+
+ @Override
+ public ReadWriteBuffer call(Command command, ReadWriteBuffer parameter)
+ throws IOException {
+ final byte[] bytes = nativeCommand(nativeHandlerAddr, command.id(),
+ null == parameter ? null : parameter.getBuff());
+
+ final ReadWriteBuffer result = new ReadWriteBuffer(bytes);
+ result.setWritePoint(bytes.length);
+ return result;
+ }
+
+ @Override
+ public void sendData() throws IOException {
+ nativeProcessInput(nativeHandlerAddr, rawOutputBuffer.position());
+ rawOutputBuffer.position(0);
+ }
+
+ @Override
+ public void finishSendData() throws IOException {
+ if (null == rawOutputBuffer || isInputFinished) {
+ return;
+ }
+
+ sendData();
+ nativeFinish(nativeHandlerAddr);
+ isInputFinished = true;
+ }
+
+ private byte[] sendCommandToJava(int command, byte[] data) throws IOException {
+ try {
+
+ final Command cmd = new Command(command);
+ ReadWriteBuffer param = null;
+
+ if (null != data) {
+ param = new ReadWriteBuffer();
+ param.reset(data);
+ param.setWritePoint(data.length);
+ }
+
+ if (null != commandDispatcher) {
+ ReadWriteBuffer result = null;
+
+ result = commandDispatcher.onCall(cmd, param);
+ if (null != result) {
+ return result.getBuff();
+ } else {
+ return null;
+ }
+ } else {
+ return null;
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Called by native side, clean output buffer so native side can continue
+ * processing
+ */
+ private void flushOutput(int length) throws IOException {
+
+ if (null != rawInputBuffer) {
+ rawInputBuffer.position(0);
+ rawInputBuffer.limit(length);
+
+ if (null != dataReceiver) {
+ try {
+ dataReceiver.receiveData();
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * Cache JNI field & method ids
+ */
+ private static native void InitIDs();
+
+ /**
+ * Setup native side BatchHandler
+ */
+ private native void setupHandler(long nativeHandlerAddr, byte[][] configs);
+
+ /**
+ * Let native side to process data in inputBuffer
+ *
+ * @param handler
+ * @param length
+ */
+ private native void nativeProcessInput(long handler, int length);
+
+ /**
+ * Notice native side input is finished
+ *
+ * @param handler
+ */
+ private native void nativeFinish(long handler);
+
+ /**
+ * Send control message to native side
+ *
+ * @param cmd
+ * command data
+ * @return return value
+ */
+ private native byte[] nativeCommand(long handler, int cmd, byte[] parameter);
+
+ /**
+ * Load data from native
+ *
+ * @return
+ */
+ private native void nativeLoadData(long handler);
+
+ protected void finishOutput() {
+ }
+
+ @Override
+ public InputBuffer getInputBuffer() {
+ return this.in;
+ }
+
+ @Override
+ public OutputBuffer getOutputBuffer() {
+ return this.out;
+ }
+
+ @Override
+ public void loadData() throws IOException {
+ nativeLoadData(nativeHandlerAddr);
+ //
+ // return call(Command.CMD_LOAD, param);
+ }
+
+ @Override
+ public void setDataReceiver(DataReceiver handler) {
+ this.dataReceiver = handler;
+ }
+
+ @Override
+ public String name() {
+ return nativeHandlerName;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+
+/**
+ * NativeDataSource loads data from upstream
+ */
+public interface NativeDataSource {
+
+ /**
+ * get input buffer
+ *
+ * @return
+ */
+ public InputBuffer getInputBuffer();
+
+ /**
+ * set listener. When data from upstream arrives, the listener will be activated.
+ *
+ * @param handler
+ */
+ void setDataReceiver(DataReceiver handler);
+
+ /**
+ * load data from upstream
+ *
+ * @return
+ * @throws IOException
+ */
+ public void loadData() throws IOException;
+
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
+
+/**
+ * NativeDataTarge sends data to downstream
+ */
+public interface NativeDataTarget {
+
+ /**
+ * send a signal to indicate that the data has been stored in output buffer
+ *
+ * @throws IOException
+ */
+ public void sendData() throws IOException;
+
+ /**
+ * Send a signal that there is no more data
+ *
+ * @throws IOException
+ */
+ public void finishSendData() throws IOException;
+
+ /**
+ * get the output buffer.
+ *
+ * @return
+ */
+ public OutputBuffer getOutputBuffer();
+
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputCollector;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.nativetask.handlers.NativeCollectorOnlyHandler;
+import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
+import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.util.QuickSort;
+import org.apache.hadoop.util.RunJar;
+
+/**
+ * native map output collector wrapped in Java interface
+ */
+public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollector<K, V> {
+
+ private static Log LOG = LogFactory.getLog(NativeMapOutputCollectorDelegator.class);
+ private JobConf job;
+ private NativeCollectorOnlyHandler<K, V> handler;
+
+ private StatusReportChecker updater;
+
+ @Override
+ public void collect(K key, V value, int partition) throws IOException, InterruptedException {
+ handler.collect(key, value, partition);
+ }
+
+ @Override
+ public void close() throws IOException, InterruptedException {
+ handler.close();
+ if (null != updater) {
+ updater.stop();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException, InterruptedException, ClassNotFoundException {
+ handler.flush();
+ }
+
+ @Override
+ public void init(Context context) throws IOException, ClassNotFoundException {
+ this.job = context.getJobConf();
+
+ Platforms.init(job);
+
+ if (job.getNumReduceTasks() == 0) {
+ String message = "There is no reducer, no need to use native output collector";
+ LOG.error(message);
+ throw new InvalidJobConfException(message);
+ }
+
+ Class comparatorClass = job.getClass(MRJobConfig.KEY_COMPARATOR, null, RawComparator.class);
+ if (comparatorClass != null && !Platforms.define(comparatorClass)) {
+ String message = "Native output collector don't support customized java comparator "
+ + job.get(MRJobConfig.KEY_COMPARATOR);
+ LOG.error(message);
+ throw new InvalidJobConfException(message);
+ }
+
+ if (job.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false) == true) {
+ if (!isCodecSupported(job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC))) {
+ String message = "Native output collector don't support compression codec "
+ + job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC) + ", We support Gzip, Lz4, snappy";
+ LOG.error(message);
+ throw new InvalidJobConfException(message);
+ }
+ }
+
+ if (!QuickSort.class.getName().equals(job.get(Constants.MAP_SORT_CLASS))) {
+ String message = "Native-Task don't support sort class " + job.get(Constants.MAP_SORT_CLASS);
+ LOG.error(message);
+ throw new InvalidJobConfException(message);
+ }
+
+ if (job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, false) == true) {
+ String message = "Native-Task don't support secure shuffle";
+ LOG.error(message);
+ throw new InvalidJobConfException(message);
+ }
+
+ final Class<?> keyCls = job.getMapOutputKeyClass();
+ try {
+ @SuppressWarnings("rawtypes")
+ final INativeSerializer serializer = NativeSerialization.getInstance().getSerializer(keyCls);
+ if (null == serializer) {
+ String message = "Key type not supported. Cannot find serializer for " + keyCls.getName();
+ LOG.error(message);
+ throw new InvalidJobConfException(message);
+ } else if (!Platforms.support(keyCls.getName(), serializer, job)) {
+ String message = "Native output collector don't support this key, this key is not comparable in native "
+ + keyCls.getName();
+ LOG.error(message);
+ throw new InvalidJobConfException(message);
+ }
+ } catch (final IOException e) {
+ String message = "Cannot find serializer for " + keyCls.getName();
+ LOG.error(message);
+ throw new IOException(message);
+ }
+
+ final boolean ret = NativeRuntime.isNativeLibraryLoaded();
+ if (ret) {
+ NativeRuntime.configure(job);
+
+ final long updateInterval = job.getLong(Constants.NATIVE_STATUS_UPDATE_INTERVAL,
+ Constants.NATIVE_STATUS_UPDATE_INTERVAL_DEFVAL);
+ updater = new StatusReportChecker(context.getReporter(), updateInterval);
+ updater.start();
+
+ } else {
+ String message = "Nativeruntime cannot be loaded, please check the libnativetask.so is in hadoop library dir";
+ LOG.error(message);
+ throw new InvalidJobConfException(message);
+ }
+
+ this.handler = null;
+ try {
+ final Class<K> oKClass = (Class<K>) job.getMapOutputKeyClass();
+ final Class<K> oVClass = (Class<K>) job.getMapOutputValueClass();
+ final TaskAttemptID id = context.getMapTask().getTaskID();
+ final TaskContext taskContext = new TaskContext(job, null, null, oKClass, oVClass,
+ context.getReporter(), id);
+ handler = NativeCollectorOnlyHandler.create(taskContext);
+ } catch (final IOException e) {
+ String message = "Native output collector cannot be loaded;";
+ LOG.error(message);
+ throw new IOException(message, e);
+ }
+
+ LOG.info("Native output collector can be successfully enabled!");
+ }
+
+ private boolean isCodecSupported(String string) {
+ if ("org.apache.hadoop.io.compress.SnappyCodec".equals(string)
+ || "org.apache.hadoop.io.compress.GzipCodec".equals(string)
+ || "org.apache.hadoop.io.compress.Lz4Codec".equals(string)) {
+ return true;
+ }
+ return false;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.mapred.nativetask.util.BytesUtil;
+import org.apache.hadoop.mapred.nativetask.util.ConfigUtil;
+import org.apache.hadoop.mapred.nativetask.util.SnappyUtil;
+import org.apache.hadoop.util.VersionInfo;
+
+/**
+ * This class stands for the native runtime It has three functions: 1. Create native handlers for map, reduce,
+ * outputcollector, and etc 2. Configure native task with provided MR configs 3. Provide file system api to native
+ * space, so that it can use File system like HDFS.
+ *
+ */
+public class NativeRuntime {
+ private static Log LOG = LogFactory.getLog(NativeRuntime.class);
+ private static boolean nativeLibraryLoaded = false;
+
+ private static Configuration conf = new Configuration();
+
+ static {
+ try {
+ if (false == SnappyUtil.isNativeSnappyLoaded(conf)) {
+ throw new IOException("Snappy library cannot be loaded");
+ } else {
+ LOG.info("Snappy native library is available");
+ }
+ System.loadLibrary("nativetask");
+ LOG.info("Nativetask JNI library loaded.");
+ nativeLibraryLoaded = true;
+ } catch (final Throwable t) {
+ // Ignore failures
+ LOG.error("Failed to load nativetask JNI library with error: " + t);
+ LOG.info("java.library.path=" + System.getProperty("java.library.path"));
+ LOG.info("LD_LIBRARY_PATH=" + System.getenv("LD_LIBRARY_PATH"));
+ }
+ }
+
+ private static void assertNativeLibraryLoaded() {
+ if (!nativeLibraryLoaded) {
+ throw new RuntimeException("Native runtime library not loaded");
+ }
+ }
+
+ public static boolean isNativeLibraryLoaded() {
+ return nativeLibraryLoaded;
+ }
+
+ public static void configure(Configuration jobConf) {
+ assertNativeLibraryLoaded();
+ conf = new Configuration(jobConf);
+ conf.set(Constants.NATIVE_HADOOP_VERSION, VersionInfo.getVersion());
+ JNIConfigure(ConfigUtil.toBytes(conf));
+ }
+
+ /**
+ * create native object We use it to create native handlers
+ *
+ * @param clazz
+ * @return
+ */
+ public synchronized static long createNativeObject(String clazz) {
+ assertNativeLibraryLoaded();
+ final long ret = JNICreateNativeObject(BytesUtil.toBytes(clazz));
+ if (ret == 0) {
+ LOG.warn("Can't create NativeObject for class " + clazz + ", probably not exist.");
+ }
+ return ret;
+ }
+
+ /**
+ * Register a customized library
+ *
+ * @param clazz
+ * @return
+ */
+ public synchronized static long registerLibrary(String libraryName, String clazz) {
+ assertNativeLibraryLoaded();
+ final long ret = JNIRegisterModule(BytesUtil.toBytes(libraryName), BytesUtil.toBytes(clazz));
+ if (ret != 0) {
+ LOG.warn("Can't create NativeObject for class " + clazz + ", probably not exist.");
+ }
+ return ret;
+ }
+
+ /**
+ * destroy native object We use to destory native handlers
+ */
+ public synchronized static void releaseNativeObject(long addr) {
+ assertNativeLibraryLoaded();
+ JNIReleaseNativeObject(addr);
+ }
+
+ /**
+ * Get the status report from native space
+ *
+ * @param reporter
+ * @throws IOException
+ */
+ public static void reportStatus(TaskReporter reporter) throws IOException {
+ assertNativeLibraryLoaded();
+ synchronized (reporter) {
+ final byte[] statusBytes = JNIUpdateStatus();
+ final DataInputBuffer ib = new DataInputBuffer();
+ ib.reset(statusBytes, statusBytes.length);
+ final FloatWritable progress = new FloatWritable();
+ progress.readFields(ib);
+ reporter.setProgress(progress.get());
+ final Text status = new Text();
+ status.readFields(ib);
+ if (status.getLength() > 0) {
+ reporter.setStatus(status.toString());
+ }
+ final IntWritable numCounters = new IntWritable();
+ numCounters.readFields(ib);
+ if (numCounters.get() == 0) {
+ return;
+ }
+ final Text group = new Text();
+ final Text name = new Text();
+ final LongWritable amount = new LongWritable();
+ for (int i = 0; i < numCounters.get(); i++) {
+ group.readFields(ib);
+ name.readFields(ib);
+ amount.readFields(ib);
+ reporter.incrCounter(group.toString(), name.toString(), amount.get());
+ }
+ }
+ }
+
+
+ /*******************************************************
+ *** The following are JNI Apis
+ ********************************************************/
+
+ /**
+ * Config the native runtime with mapreduce job configurations.
+ *
+ * @param configs
+ */
+ private native static void JNIConfigure(byte[][] configs);
+
+ /**
+ * create a native object in native space
+ *
+ * @param clazz
+ * @return
+ */
+ private native static long JNICreateNativeObject(byte[] clazz);
+
+ /**
+ * create the default native object for certain type
+ *
+ * @param type
+ * @return
+ */
+ @Deprecated
+ private native static long JNICreateDefaultNativeObject(byte[] type);
+
+ /**
+ * destroy native object in native space
+ *
+ * @param addr
+ */
+ private native static void JNIReleaseNativeObject(long addr);
+
+ /**
+ * get status update from native side Encoding: progress:float status:Text Counter number: int the count of the
+ * counters Counters: array [group:Text, name:Text, incrCount:Long]
+ *
+ * @return
+ */
+ private native static byte[] JNIUpdateStatus();
+
+ /**
+ * Not used.
+ */
+ private native static void JNIRelease();
+
+ /**
+ * Not used.
+ */
+ private native static int JNIRegisterModule(byte[] path, byte[] name);
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
+import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
+
+/**
+ * Base class for platforms. A platform is a framework running on top of
+ * MapReduce, like Hadoop, Hive, Pig, Mahout. Each framework defines its
+ * own key type and value type across a MapReduce job. For each platform,
+ * we should implement serializers such that we could communicate data with
+ * native side and native comparators so our native output collectors could
+ * sort them and write out. We've already provided the {@link HadoopPlatform}
+ * that supports all key types of Hadoop and users could implement their custom
+ * platform.
+ */
+public abstract class Platform {
+ private final NativeSerialization serialization;
+ protected Set<String> keyClassNames = new HashSet<String>();
+
+ public Platform() {
+ this.serialization = NativeSerialization.getInstance();
+ }
+
+ /**
+ * initialize a platform, where we should call registerKey
+ *
+ * @throws IOException
+ */
+ public abstract void init() throws IOException;
+
+ /**
+ * @return name of a Platform, useful for logs and debug
+ */
+ public abstract String name();
+
+
+ /**
+ * associate a key class with its serializer and platform
+ *
+ * @param keyClassName map out key class name
+ * @param key key serializer class
+ * @throws IOException
+ */
+ protected void registerKey(String keyClassName, Class key) throws IOException {
+ serialization.register(keyClassName, key);
+ keyClassNames.add(keyClassName);
+ }
+
+ /**
+ * whether a platform supports a specific key should at least satisfy two conditions
+ *
+ * 1. the key belongs to the platform
+ * 2. the associated serializer must implement {@link INativeComparable} interface
+ *
+ *
+ * @param keyClassName map out put key class name
+ * @param serializer serializer associated with key via registerKey
+ * @param job job configuration
+ * @return true if the platform has implemented native comparators of the key and
+ * false otherwise
+ */
+ protected abstract boolean support(String keyClassName, INativeSerializer serializer, JobConf job);
+
+
+ /**
+ * whether it's the platform that has defined a custom Java comparator
+ *
+ * NativeTask doesn't support custom Java comparator(set with mapreduce.job.output.key.comparator.class)
+ * but a platform (e.g Pig) could also set that conf and implement native comparators so
+ * we shouldn't bail out.
+ *
+ * @param keyComparator comparator set with mapreduce.job.output.key.comparator.class
+ * @return
+ */
+ protected abstract boolean define(Class keyComparator);
+}