You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/17 19:45:01 UTC
svn commit: r1611413 [8/18] - in
/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client:
./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nativ...
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/gtest/gtest_main.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/gtest/gtest_main.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/gtest/gtest_main.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/gtest/gtest_main.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,39 @@
+// Copyright 2006, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#include <iostream>
+
+#include "gtest/gtest.h"
+
+GTEST_API_ int main(int argc, char **argv) {
+ std::cout << "Running main() from gtest_main.cc\n";
+
+ testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/lz4/lz4.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/lz4/lz4.c?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/lz4/lz4.c (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/lz4/lz4.c Thu Jul 17 17:44:55 2014
@@ -0,0 +1,740 @@
+/*
+ LZ4 - Fast LZ compression algorithm
+ Copyright (C) 2011-2012, Yann Collet.
+ BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are
+ met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following disclaimer
+ in the documentation and/or other materials provided with the
+ distribution.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+//**************************************
+// Compilation Directives
+//**************************************
+#if __STDC_VERSION__ >= 199901L
+ /* "restrict" is a known keyword */
+#else
+#define restrict // Disable restrict
+#endif
+
+#ifdef _MSC_VER
+#define inline __forceinline
+#endif
+
+#ifdef __GNUC__
+#define _PACKED __attribute__ ((packed))
+#else
+#define _PACKED
+#endif
+
+#if (__x86_64__ || __ppc64__ || _WIN64 || __LP64__) // Detect 64 bits mode
+#define ARCH64 1
+#else
+#define ARCH64 0
+#endif
+
+
+//**************************************
+// Includes
+//**************************************
+#include <stdlib.h> // for malloc
+#include <string.h> // for memset
+#include "lz4.h"
+
+
+//**************************************
+// Performance parameter
+//**************************************
+// Increasing this value improves compression ratio
+// Lowering this value reduces memory usage
+// Lowering may also improve speed, typically on reaching cache size limits (L1 32KB for Intel, 64KB for AMD)
+// Memory usage formula for 32 bits systems : N->2^(N+2) Bytes (examples : 17 -> 512KB ; 12 -> 16KB)
+#define HASH_LOG 12
+
+//#define _FORCE_SW_BITCOUNT // Uncomment for better performance if target platform has no hardware support for LowBitCount
+
+
+//**************************************
+// Basic Types
+//**************************************
+#if defined(_MSC_VER) // Visual Studio does not support 'stdint' natively
+#define BYTE unsigned __int8
+#define U16 unsigned __int16
+#define U32 unsigned __int32
+#define S32 __int32
+#define U64 unsigned __int64
+#else
+#include <stdint.h>
+#define BYTE uint8_t
+#define U16 uint16_t
+#define U32 uint32_t
+#define S32 int32_t
+#define U64 uint64_t
+#endif
+
+
+//**************************************
+// Constants
+//**************************************
+#define MINMATCH 4
+#define SKIPSTRENGTH 6
+#define STACKLIMIT 13
+#define HEAPMODE (HASH_LOG>STACKLIMIT) // Defines if memory is allocated into the stack (local variable), or into the heap (malloc()).
+#define COPYLENGTH 8
+#define LASTLITERALS 5
+#define MFLIMIT (COPYLENGTH+MINMATCH)
+#define MINLENGTH (MFLIMIT+1)
+
+#define MAXD_LOG 16
+#define MAX_DISTANCE ((1 << MAXD_LOG) - 1)
+
+#define HASHTABLESIZE (1 << HASH_LOG)
+#define HASH_MASK (HASHTABLESIZE - 1)
+
+#define ML_BITS 4
+#define ML_MASK ((1U<<ML_BITS)-1)
+#define RUN_BITS (8-ML_BITS)
+#define RUN_MASK ((1U<<RUN_BITS)-1)
+
+
+//**************************************
+// Local structures
+//**************************************
+struct refTables
+{
+ const BYTE* hashTable[HASHTABLESIZE];
+};
+
+typedef struct _U64_S
+{
+ U64 v;
+} _PACKED U64_S;
+
+typedef struct _U32_S
+{
+ U32 v;
+} _PACKED U32_S;
+
+typedef struct _U16_S
+{
+ U16 v;
+} _PACKED U16_S;
+
+#define A64(x) (((U64_S *)(x))->v)
+#define A32(x) (((U32_S *)(x))->v)
+#define A16(x) (((U16_S *)(x))->v)
+
+
+//**************************************
+// Architecture-specific macros
+//**************************************
+#if ARCH64 // 64-bit
+#define STEPSIZE 8
+#define UARCH U64
+#define AARCH A64
+#define LZ4_COPYSTEP(s,d) A64(d) = A64(s); d+=8; s+=8;
+#define LZ4_COPYPACKET(s,d) LZ4_COPYSTEP(s,d)
+#define LZ4_SECURECOPY(s,d,e) if (d<e) LZ4_WILDCOPY(s,d,e)
+#define HTYPE U32
+#define INITBASE(base) const BYTE* const base = ip
+#else // 32-bit
+#define STEPSIZE 4
+#define UARCH U32
+#define AARCH A32
+#define LZ4_COPYSTEP(s,d) A32(d) = A32(s); d+=4; s+=4;
+#define LZ4_COPYPACKET(s,d) LZ4_COPYSTEP(s,d); LZ4_COPYSTEP(s,d);
+#define LZ4_SECURECOPY LZ4_WILDCOPY
+#define HTYPE const BYTE*
+#define INITBASE(base) const int base = 0
+#endif
+
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+#define LZ4_READ_LITTLEENDIAN_16(d,s,p) { d = (s) - A16(p); }
+#define LZ4_WRITE_LITTLEENDIAN_16(p,v) { A16(p) = v; p+=2; }
+#define LZ4_NbCommonBytes LZ4_NbCommonBytes_LittleEndian
+#else // Big Endian
+#define LZ4_READ_LITTLEENDIAN_16(d,s,p) { int delta = p[0]; delta += p[1] << 8; d = (s) - delta; }
+#define LZ4_WRITE_LITTLEENDIAN_16(p,v) { int delta = v; *p++ = delta; *p++ = delta>>8; }
+#define LZ4_NbCommonBytes LZ4_NbCommonBytes_BigEndian
+#endif
+
+
+//**************************************
+// Macros
+//**************************************
+#define LZ4_HASH_FUNCTION(i) (((i) * 2654435761U) >> ((MINMATCH*8)-HASH_LOG))
+#define LZ4_HASH_VALUE(p) LZ4_HASH_FUNCTION(A32(p))
+#define LZ4_WILDCOPY(s,d,e) do { LZ4_COPYPACKET(s,d) } while (d<e);
+#define LZ4_BLINDCOPY(s,d,l) { BYTE* e=(d)+l; LZ4_WILDCOPY(s,d,e); d=e; }
+
+
+//****************************
+// Private functions
+//****************************
+#if ARCH64
+
+inline static int LZ4_NbCommonBytes_LittleEndian (register U64 val)
+{
+ #if defined(_MSC_VER) && !defined(_FORCE_SW_BITCOUNT)
+ unsigned long r = 0;
+ _BitScanForward64( &r, val );
+ return (int)(r>>3);
+ #elif defined(__GNUC__) && !defined(_FORCE_SW_BITCOUNT)
+ return (__builtin_ctzll(val) >> 3);
+ #else
+ static const int DeBruijnBytePos[64] = { 0, 0, 0, 0, 0, 1, 1, 2, 0, 3, 1, 3, 1, 4, 2, 7, 0, 2, 3, 6, 1, 5, 3, 5, 1, 3, 4, 4, 2, 5, 6, 7, 7, 0, 1, 2, 3, 3, 4, 6, 2, 6, 5, 5, 3, 4, 5, 6, 7, 1, 2, 4, 6, 4, 4, 5, 7, 2, 6, 5, 7, 6, 7, 7 };
+ return DeBruijnBytePos[((U64)((val & -val) * 0x0218A392CDABBD3F)) >> 58];
+ #endif
+}
+
+inline static int LZ4_NbCommonBytes_BigEndian (register U64 val)
+{
+ #if defined(_MSC_VER) && !defined(_FORCE_SW_BITCOUNT)
+ unsigned long r = 0;
+ _BitScanReverse64( &r, val );
+ return (int)(r>>3);
+ #elif defined(__GNUC__) && !defined(_FORCE_SW_BITCOUNT)
+ return (__builtin_clzll(val) >> 3);
+ #else
+ int r;
+ if (!(val>>32)) { r=4; } else { r=0; val>>=32; }
+ if (!(val>>16)) { r+=2; val>>=8; } else { val>>=24; }
+ r += (!val);
+ return r;
+ #endif
+}
+
+#else
+
+inline static int LZ4_NbCommonBytes_LittleEndian (register U32 val)
+{
+ #if defined(_MSC_VER) && !defined(_FORCE_SW_BITCOUNT)
+ unsigned long r = 0;
+ _BitScanForward( &r, val );
+ return (int)(r>>3);
+ #elif defined(__GNUC__) && !defined(_FORCE_SW_BITCOUNT)
+ return (__builtin_ctz(val) >> 3);
+ #else
+ static const int DeBruijnBytePos[32] = { 0, 0, 3, 0, 3, 1, 3, 0, 3, 2, 2, 1, 3, 2, 0, 1, 3, 3, 1, 2, 2, 2, 2, 0, 3, 1, 2, 0, 1, 0, 1, 1 };
+ return DeBruijnBytePos[((U32)((val & -val) * 0x077CB531U)) >> 27];
+ #endif
+}
+
+inline static int LZ4_NbCommonBytes_BigEndian (register U32 val)
+{
+ #if defined(_MSC_VER) && !defined(_FORCE_SW_BITCOUNT)
+ unsigned long r = 0;
+ _BitScanReverse( &r, val );
+ return (int)(r>>3);
+ #elif defined(__GNUC__) && !defined(_FORCE_SW_BITCOUNT)
+ return (__builtin_clz(val) >> 3);
+ #else
+ int r;
+ if (!(val>>16)) { r=2; val>>=8; } else { r=0; val>>=24; }
+ r += (!val);
+ return r;
+ #endif
+}
+
+#endif
+
+
+//******************************
+// Public Compression functions
+//******************************
+
+int LZ4_compressCtx(void** ctx,
+ char* source,
+ char* dest,
+ int isize)
+{
+#if HEAPMODE
+ struct refTables *srt = (struct refTables *) (*ctx);
+ HTYPE* HashTable;
+#else
+ HTYPE HashTable[HASHTABLESIZE] = {0};
+#endif
+
+ const BYTE* ip = (BYTE*) source;
+ INITBASE(base);
+ const BYTE* anchor = ip;
+ const BYTE* const iend = ip + isize;
+ const BYTE* const mflimit = iend - MFLIMIT;
+#define matchlimit (iend - LASTLITERALS)
+
+ BYTE* op = (BYTE*) dest;
+
+ int len, length;
+ const int skipStrength = SKIPSTRENGTH;
+ U32 forwardH;
+
+
+ // Init
+ if (isize<MINLENGTH) goto _last_literals;
+#if HEAPMODE
+ if (*ctx == NULL)
+ {
+ srt = (struct refTables *) malloc ( sizeof(struct refTables) );
+ *ctx = (void*) srt;
+ }
+ HashTable = (HTYPE*)(srt->hashTable);
+ memset((void*)HashTable, 0, sizeof(srt->hashTable));
+#else
+ (void) ctx;
+#endif
+
+
+ // First Byte
+ HashTable[LZ4_HASH_VALUE(ip)] = ip - base;
+ ip++; forwardH = LZ4_HASH_VALUE(ip);
+
+ // Main Loop
+ for ( ; ; )
+ {
+ int findMatchAttempts = (1U << skipStrength) + 3;
+ const BYTE* forwardIp = ip;
+ const BYTE* ref;
+ BYTE* token;
+
+ // Find a match
+ do {
+ U32 h = forwardH;
+ int step = findMatchAttempts++ >> skipStrength;
+ ip = forwardIp;
+ forwardIp = ip + step;
+
+ if (forwardIp > mflimit) { goto _last_literals; }
+
+ forwardH = LZ4_HASH_VALUE(forwardIp);
+ ref = base + HashTable[h];
+ HashTable[h] = ip - base;
+
+ } while ((ref < ip - MAX_DISTANCE) || (A32(ref) != A32(ip)));
+
+ // Catch up
+ while ((ip>anchor) && (ref>(BYTE*)source) && (ip[-1]==ref[-1])) { ip--; ref--; }
+
+ // Encode Literal length
+ length = ip - anchor;
+ token = op++;
+ if (length>=(int)RUN_MASK) { *token=(RUN_MASK<<ML_BITS); len = length-RUN_MASK; for(; len > 254 ; len-=255) *op++ = 255; *op++ = (BYTE)len; }
+ else *token = (length<<ML_BITS);
+
+ // Copy Literals
+ LZ4_BLINDCOPY(anchor, op, length);
+
+_next_match:
+ // Encode Offset
+ LZ4_WRITE_LITTLEENDIAN_16(op,ip-ref);
+
+ // Start Counting
+ ip+=MINMATCH; ref+=MINMATCH; // MinMatch verified
+ anchor = ip;
+ while (ip<matchlimit-(STEPSIZE-1))
+ {
+ UARCH diff = AARCH(ref) ^ AARCH(ip);
+ if (!diff) { ip+=STEPSIZE; ref+=STEPSIZE; continue; }
+ ip += LZ4_NbCommonBytes(diff);
+ goto _endCount;
+ }
+ if (ARCH64) if ((ip<(matchlimit-3)) && (A32(ref) == A32(ip))) { ip+=4; ref+=4; }
+ if ((ip<(matchlimit-1)) && (A16(ref) == A16(ip))) { ip+=2; ref+=2; }
+ if ((ip<matchlimit) && (*ref == *ip)) ip++;
+_endCount:
+
+ // Encode MatchLength
+ len = (ip - anchor);
+ if (len>=(int)ML_MASK) { *token+=ML_MASK; len-=ML_MASK; for(; len > 509 ; len-=510) { *op++ = 255; *op++ = 255; } if (len > 254) { len-=255; *op++ = 255; } *op++ = (BYTE)len; }
+ else *token += len;
+
+ // Test end of chunk
+ if (ip > mflimit) { anchor = ip; break; }
+
+ // Fill table
+ HashTable[LZ4_HASH_VALUE(ip-2)] = ip - 2 - base;
+
+ // Test next position
+ ref = base + HashTable[LZ4_HASH_VALUE(ip)];
+ HashTable[LZ4_HASH_VALUE(ip)] = ip - base;
+ if ((ref > ip - (MAX_DISTANCE + 1)) && (A32(ref) == A32(ip))) { token = op++; *token=0; goto _next_match; }
+
+ // Prepare next loop
+ anchor = ip++;
+ forwardH = LZ4_HASH_VALUE(ip);
+ }
+
+_last_literals:
+ // Encode Last Literals
+ {
+ int lastRun = iend - anchor;
+ if (lastRun>=(int)RUN_MASK) { *op++=(RUN_MASK<<ML_BITS); lastRun-=RUN_MASK; for(; lastRun > 254 ; lastRun-=255) *op++ = 255; *op++ = (BYTE) lastRun; }
+ else *op++ = (lastRun<<ML_BITS);
+ memcpy(op, anchor, iend - anchor);
+ op += iend-anchor;
+ }
+
+ // End
+ return (int) (((char*)op)-dest);
+}
+
+
+
+// Note : this function is valid only if isize < LZ4_64KLIMIT
+#define LZ4_64KLIMIT ((1<<16) + (MFLIMIT-1))
+#define HASHLOG64K (HASH_LOG+1)
+#define HASH64KTABLESIZE (1U<<HASHLOG64K)
+#define LZ4_HASH64K_FUNCTION(i) (((i) * 2654435761U) >> ((MINMATCH*8)-HASHLOG64K))
+#define LZ4_HASH64K_VALUE(p) LZ4_HASH64K_FUNCTION(A32(p))
+int LZ4_compress64kCtx(void** ctx,
+ char* source,
+ char* dest,
+ int isize)
+{
+#if HEAPMODE
+ struct refTables *srt = (struct refTables *) (*ctx);
+ U16* HashTable;
+#else
+ U16 HashTable[HASH64KTABLESIZE] = {0};
+#endif
+
+ const BYTE* ip = (BYTE*) source;
+ const BYTE* anchor = ip;
+ const BYTE* const base = ip;
+ const BYTE* const iend = ip + isize;
+ const BYTE* const mflimit = iend - MFLIMIT;
+#define matchlimit (iend - LASTLITERALS)
+
+ BYTE* op = (BYTE*) dest;
+
+ int len, length;
+ const int skipStrength = SKIPSTRENGTH;
+ U32 forwardH;
+
+
+ // Init
+ if (isize<MINLENGTH) goto _last_literals;
+#if HEAPMODE
+ if (*ctx == NULL)
+ {
+ srt = (struct refTables *) malloc ( sizeof(struct refTables) );
+ *ctx = (void*) srt;
+ }
+ HashTable = (U16*)(srt->hashTable);
+ memset((void*)HashTable, 0, sizeof(srt->hashTable));
+#else
+ (void) ctx;
+#endif
+
+
+ // First Byte
+ ip++; forwardH = LZ4_HASH64K_VALUE(ip);
+
+ // Main Loop
+ for ( ; ; )
+ {
+ int findMatchAttempts = (1U << skipStrength) + 3;
+ const BYTE* forwardIp = ip;
+ const BYTE* ref;
+ BYTE* token;
+
+ // Find a match
+ do {
+ U32 h = forwardH;
+ int step = findMatchAttempts++ >> skipStrength;
+ ip = forwardIp;
+ forwardIp = ip + step;
+
+ if (forwardIp > mflimit) { goto _last_literals; }
+
+ forwardH = LZ4_HASH64K_VALUE(forwardIp);
+ ref = base + HashTable[h];
+ HashTable[h] = ip - base;
+
+ } while (A32(ref) != A32(ip));
+
+ // Catch up
+ while ((ip>anchor) && (ref>(BYTE*)source) && (ip[-1]==ref[-1])) { ip--; ref--; }
+
+ // Encode Literal length
+ length = ip - anchor;
+ token = op++;
+ if (length>=(int)RUN_MASK) { *token=(RUN_MASK<<ML_BITS); len = length-RUN_MASK; for(; len > 254 ; len-=255) *op++ = 255; *op++ = (BYTE)len; }
+ else *token = (length<<ML_BITS);
+
+ // Copy Literals
+ LZ4_BLINDCOPY(anchor, op, length);
+
+_next_match:
+ // Encode Offset
+ LZ4_WRITE_LITTLEENDIAN_16(op,ip-ref);
+
+ // Start Counting
+ ip+=MINMATCH; ref+=MINMATCH; // MinMatch verified
+ anchor = ip;
+ while (ip<matchlimit-(STEPSIZE-1))
+ {
+ UARCH diff = AARCH(ref) ^ AARCH(ip);
+ if (!diff) { ip+=STEPSIZE; ref+=STEPSIZE; continue; }
+ ip += LZ4_NbCommonBytes(diff);
+ goto _endCount;
+ }
+ if (ARCH64) if ((ip<(matchlimit-3)) && (A32(ref) == A32(ip))) { ip+=4; ref+=4; }
+ if ((ip<(matchlimit-1)) && (A16(ref) == A16(ip))) { ip+=2; ref+=2; }
+ if ((ip<matchlimit) && (*ref == *ip)) ip++;
+_endCount:
+
+ // Encode MatchLength
+ len = (ip - anchor);
+ if (len>=(int)ML_MASK) { *token+=ML_MASK; len-=ML_MASK; for(; len > 509 ; len-=510) { *op++ = 255; *op++ = 255; } if (len > 254) { len-=255; *op++ = 255; } *op++ = (BYTE)len; }
+ else *token += len;
+
+ // Test end of chunk
+ if (ip > mflimit) { anchor = ip; break; }
+
+ // Fill table
+ HashTable[LZ4_HASH64K_VALUE(ip-2)] = ip - 2 - base;
+
+ // Test next position
+ ref = base + HashTable[LZ4_HASH64K_VALUE(ip)];
+ HashTable[LZ4_HASH64K_VALUE(ip)] = ip - base;
+ if (A32(ref) == A32(ip)) { token = op++; *token=0; goto _next_match; }
+
+ // Prepare next loop
+ anchor = ip++;
+ forwardH = LZ4_HASH64K_VALUE(ip);
+ }
+
+_last_literals:
+ // Encode Last Literals
+ {
+ int lastRun = iend - anchor;
+ if (lastRun>=(int)RUN_MASK) { *op++=(RUN_MASK<<ML_BITS); lastRun-=RUN_MASK; for(; lastRun > 254 ; lastRun-=255) *op++ = 255; *op++ = (BYTE) lastRun; }
+ else *op++ = (lastRun<<ML_BITS);
+ memcpy(op, anchor, iend - anchor);
+ op += iend-anchor;
+ }
+
+ // End
+ return (int) (((char*)op)-dest);
+}
+
+
+
+int LZ4_compress(char* source,
+ char* dest,
+ int isize)
+{
+#if HEAPMODE
+ void* ctx = malloc(sizeof(struct refTables));
+ int result;
+ if (isize < LZ4_64KLIMIT)
+ result = LZ4_compress64kCtx(&ctx, source, dest, isize);
+ else result = LZ4_compressCtx(&ctx, source, dest, isize);
+ free(ctx);
+ return result;
+#else
+ if (isize < (int)LZ4_64KLIMIT) return LZ4_compress64kCtx(NULL, source, dest, isize);
+ return LZ4_compressCtx(NULL, source, dest, isize);
+#endif
+}
+
+
+
+
+//****************************
+// Decompression functions
+//****************************
+
+// Note : The decoding functions LZ4_uncompress() and LZ4_uncompress_unknownOutputSize()
+// are safe against "buffer overflow" attack type.
+// They will never write nor read outside of the provided input and output buffers.
+// A corrupted input will produce an error result, a negative int, indicating the position of the error within input stream.
+
+int LZ4_uncompress(char* source,
+ char* dest,
+ int osize)
+{
+ // Local Variables
+ const BYTE* restrict ip = (const BYTE*) source;
+ const BYTE* restrict ref;
+
+ BYTE* restrict op = (BYTE*) dest;
+ BYTE* const oend = op + osize;
+ BYTE* cpy;
+
+ BYTE token;
+
+ int len, length;
+ size_t dec[] ={0, 3, 2, 3, 0, 0, 0, 0};
+
+
+ // Main Loop
+ while (1)
+ {
+ // get runlength
+ token = *ip++;
+ if ((length=(token>>ML_BITS)) == RUN_MASK) { for (;(len=*ip++)==255;length+=255){} length += len; }
+
+ // copy literals
+ cpy = op+length;
+ if (cpy>oend-COPYLENGTH)
+ {
+ if (cpy > oend) goto _output_error;
+ memcpy(op, ip, length);
+ ip += length;
+ break; // Necessarily EOF
+ }
+ LZ4_WILDCOPY(ip, op, cpy); ip -= (op-cpy); op = cpy;
+
+ // get offset
+ LZ4_READ_LITTLEENDIAN_16(ref,cpy,ip); ip+=2;
+ if (ref < (BYTE* const)dest) goto _output_error;
+
+ // get matchlength
+ if ((length=(token&ML_MASK)) == ML_MASK) { for (;*ip==255;length+=255) {ip++;} length += *ip++; }
+
+ // copy repeated sequence
+ if (op-ref<STEPSIZE)
+ {
+#if ARCH64
+ size_t dec2table[]={0, 4, 4, 3, 4, 5, 6, 7};
+ size_t dec2 = dec2table[op-ref];
+#else
+ const int dec2 = 0;
+#endif
+ *op++ = *ref++;
+ *op++ = *ref++;
+ *op++ = *ref++;
+ *op++ = *ref++;
+ ref -= dec[op-ref];
+ A32(op)=A32(ref); op += STEPSIZE-4; ref += STEPSIZE-4;
+ ref -= dec2;
+ } else { LZ4_COPYSTEP(ref,op); }
+ cpy = op + length - (STEPSIZE-4);
+ if (cpy>oend-COPYLENGTH)
+ {
+ if (cpy > oend) goto _output_error;
+ LZ4_SECURECOPY(ref, op, (oend-COPYLENGTH));
+ while(op<cpy) *op++=*ref++;
+ op=cpy;
+ if (op == oend) break; // Check EOF (should never happen, since last 5 bytes are supposed to be literals)
+ continue;
+ }
+ LZ4_SECURECOPY(ref, op, cpy);
+ op=cpy; // correction
+ }
+
+ // end of decoding
+ return (int) (((char*)ip)-source);
+
+ // write overflow error detected
+_output_error:
+ return (int) (-(((char*)ip)-source));
+}
+
+
+int LZ4_uncompress_unknownOutputSize(
+ char* source,
+ char* dest,
+ int isize,
+ int maxOutputSize)
+{
+ // Local Variables
+ const BYTE* restrict ip = (const BYTE*) source;
+ const BYTE* const iend = ip + isize;
+ const BYTE* restrict ref;
+
+ BYTE* restrict op = (BYTE*) dest;
+ BYTE* const oend = op + maxOutputSize;
+ BYTE* cpy;
+
+ BYTE token;
+
+ int len, length;
+ size_t dec[] ={0, 3, 2, 3, 0, 0, 0, 0};
+
+
+ // Main Loop
+ while (ip<iend)
+ {
+ // get runlength
+ token = *ip++;
+ if ((length=(token>>ML_BITS)) == RUN_MASK) { for (;(len=*ip++)==255;length+=255){} length += len; }
+
+ // copy literals
+ cpy = op+length;
+ if (cpy>oend-COPYLENGTH)
+ {
+ if (cpy > oend) goto _output_error;
+ memcpy(op, ip, length);
+ op += length;
+ break; // Necessarily EOF
+ }
+ LZ4_WILDCOPY(ip, op, cpy); ip -= (op-cpy); op = cpy;
+ if (ip>=iend) break; // check EOF
+
+ // get offset
+ LZ4_READ_LITTLEENDIAN_16(ref,cpy,ip); ip+=2;
+ if (ref < (BYTE* const)dest) goto _output_error;
+
+ // get matchlength
+ if ((length=(token&ML_MASK)) == ML_MASK) { for (;(len=*ip++)==255;length+=255){} length += len; }
+
+ // copy repeated sequence
+ if (op-ref<STEPSIZE)
+ {
+#if ARCH64
+ size_t dec2table[]={0, 4, 4, 3, 4, 5, 6, 7};
+ size_t dec2 = dec2table[op-ref];
+#else
+ const int dec2 = 0;
+#endif
+ *op++ = *ref++;
+ *op++ = *ref++;
+ *op++ = *ref++;
+ *op++ = *ref++;
+ ref -= dec[op-ref];
+ A32(op)=A32(ref); op += STEPSIZE-4; ref += STEPSIZE-4;
+ ref -= dec2;
+ } else { LZ4_COPYSTEP(ref,op); }
+ cpy = op + length - (STEPSIZE-4);
+ if (cpy>oend-COPYLENGTH)
+ {
+ if (cpy > oend) goto _output_error;
+ LZ4_SECURECOPY(ref, op, (oend-COPYLENGTH));
+ while(op<cpy) *op++=*ref++;
+ op=cpy;
+ if (op == oend) break; // Check EOF (should never happen, since last 5 bytes are supposed to be literals)
+ continue;
+ }
+ LZ4_SECURECOPY(ref, op, cpy);
+ op=cpy; // correction
+ }
+
+ // end of decoding
+ return (int) (((char*)op)-dest);
+
+ // write overflow error detected
+_output_error:
+ return (int) (-(((char*)ip)-source));
+}
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/lz4/lz4.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/lz4/lz4.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/lz4/lz4.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/lz4/lz4.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,96 @@
+/*
+ LZ4 - Fast LZ compression algorithm
+ Header File
+ Copyright (C) 2011, Yann Collet.
+ BSD License
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are
+ met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following disclaimer
+ in the documentation and/or other materials provided with the
+ distribution.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+#pragma once
+
+#if defined (__cplusplus)
+extern "C" {
+#endif
+
+
+//****************************
+// Simple Functions
+//****************************
+
+int LZ4_compress (char* source, char* dest, int isize);
+int LZ4_uncompress (char* source, char* dest, int osize);
+
+/*
+LZ4_compress() :
+ return : the number of bytes in compressed buffer dest
+ note : destination buffer must be already allocated.
+ To avoid any problem, size it to handle worst cases situations (input data not compressible)
+ Worst case size is : "inputsize + 0.4%", with "0.4%" being at least 8 bytes.
+
+LZ4_uncompress() :
+ osize : is the output size, therefore the original size
+ return : the number of bytes read in the source buffer
+ If the source stream is malformed, the function will stop decoding and return a negative result, indicating the byte position of the faulty instruction
+ This version never writes beyond dest + osize, and is therefore protected against malicious data packets
+ note 2 : destination buffer must be already allocated
+*/
+
+
+//****************************
+// Advanced Functions
+//****************************
+
+int LZ4_uncompress_unknownOutputSize (char* source, char* dest, int isize, int maxOutputSize);
+
+/*
+LZ4_uncompress_unknownOutputSize() :
+ isize : is the input size, therefore the compressed size
+ maxOutputSize : is the size of the destination buffer (which must be already allocated)
+ return : the number of bytes decoded in the destination buffer (necessarily <= maxOutputSize)
+ If the source stream is malformed, the function will stop decoding and return a negative result, indicating the byte position of the faulty instruction
+ This version never writes beyond dest + maxOutputSize, and is therefore protected against malicious data packets
+ note : This version is a bit slower than LZ4_uncompress
+*/
+
+
+int LZ4_compressCtx(void** ctx, char* source, char* dest, int isize);
+
+/*
+LZ4_compressCtx() :
+ This function explicitly handles the CTX memory structure.
+ It avoids allocating/deallocating memory between each call, improving performance when malloc is time-consuming.
+ Note : when memory is allocated into the stack (default mode), there is no "malloc" penalty.
+ Therefore, this function is mostly useful when memory is allocated into the heap (it requires increasing HASH_LOG value beyond STACK_LIMIT)
+
+ On first call : provide a *ctx=NULL; It will be automatically allocated.
+ On next calls : reuse the same ctx pointer.
+ Use different pointers for different threads when doing multi-threading.
+
+ note : performance difference is small, mostly noticeable in HeapMode when repetitively calling the compression function over many small segments.
+*/
+
+
+#if defined (__cplusplus)
+}
+#endif
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/NativeTask.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/NativeTask.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/NativeTask.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/NativeTask.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NATIVETASK_H_
+#define NATIVETASK_H_
+
+#include "lib/jniutils.h"
+#include <stdint.h>
+#include <string>
+#include <vector>
+#include <map>
+
+namespace NativeTask {
+
+using std::string;
+using std::vector;
+using std::map;
+using std::pair;
+
+/**
+ * NativeObjectType
+ */
+enum NativeObjectType {
+ UnknownObjectType = 0,
+ BatchHandlerType = 1,
+ MapperType = 2,
+ ReducerType = 3,
+ PartitionerType = 4,
+ CombinerType = 5,
+ FolderType = 6,
+ RecordReaderType = 7,
+ RecordWriterType = 8
+};
+
+/**
+ * Enduim setting
+ *
+ */
+enum Endium {
+ LITTLE_ENDIUM = 0,
+ LARGE_ENDIUM = 1
+};
+
+#define NATIVE_COMBINER "native.combiner.class"
+#define NATIVE_PARTITIONER "native.partitioner.class"
+#define NATIVE_MAPPER "native.mapper.class"
+#define NATIVE_RECORDREADER "native.recordreader.class"
+#define NATIVE_RECORDWRITER "native.recordwriter.class"
+
+#define NATIVE_REDUCER "native.reducer.class"
+#define NATIVE_HADOOP_VERSION "native.hadoop.version"
+
+#define NATIVE_INPUT_SPLIT "native.input.split"
+#define INPUT_LINE_KV_SEPERATOR "mapreduce.input.keyvaluelinerecordreader.key.value.separator"
+#define MAPRED_TEXTOUTPUT_FORMAT_SEPERATOR "mapreduce.output.textoutputformat.separator"
+#define MAPRED_WORK_OUT_DIR "mapreduce.task.output.dir"
+#define NATIVE_OUTPUT_FILE_NAME "native.output.file.name"
+#define MAPRED_COMPRESS_OUTPUT "mapreduce.output.fileoutputformat.compress"
+#define MAPRED_OUTPUT_COMPRESSION_CODEC "mapreduce.output.fileoutputformat.compress.codec"
+#define TOTAL_ORDER_PARTITIONER_PATH "total.order.partitioner.path"
+#define TOTAL_ORDER_PARTITIONER_MAX_TRIE_DEPTH "total.order.partitioner.max.trie.depth"
+#define FS_DEFAULT_NAME "fs.default.name"
+#define FS_DEFAULT_FS "fs.defaultFS"
+
+#define NATIVE_SORT_TYPE "native.sort.type"
+#define MAPRED_SORT_AVOID "mapreduce.sort.avoidance"
+#define NATIVE_SORT_MAX_BLOCK_SIZE "native.sort.blocksize.max"
+#define MAPRED_COMPRESS_MAP_OUTPUT "mapreduce.map.output.compress"
+#define MAPRED_MAP_OUTPUT_COMPRESSION_CODEC "mapreduce.map.output.compress.codec"
+#define MAPRED_MAPOUTPUT_KEY_CLASS "mapreduce.map.output.key.class"
+#define MAPRED_OUTPUT_KEY_CLASS "mapreduce.job.output.key.class"
+#define MAPRED_MAPOUTPUT_VALUE_CLASS "mapreduce.map.output.value.class"
+#define MAPRED_OUTPUT_VALUE_CLASS "mapreduce.job.output.value.class"
+#define MAPRED_IO_SORT_MB "mapreduce.task.io.sort.mb"
+#define MAPRED_NUM_REDUCES "mapreduce.job.reduces"
+#define MAPRED_COMBINE_CLASS_OLD "mapred.combiner.class"
+#define MAPRED_COMBINE_CLASS_NEW "mapreduce.job.combine.class"
+
+#define NATIVE_LOG_DEVICE "native.log.device"
+
+//format: name=path,name=path,name=path
+#define NATIVE_CLASS_LIBRARY_BUILDIN "native.class.library.buildin"
+
+#define NATIVE_MAPOUT_KEY_COMPARATOR "native.map.output.key.comparator"
+
+extern const std::string NativeObjectTypeToString(NativeObjectType type);
+extern NativeObjectType NativeObjectTypeFromString(const std::string type);
+
+/**
+ * Objects that can be loaded dynamically from shared library,
+ * and managed by NativeObjectFactory
+ */
+class NativeObject {
+public:
+ virtual NativeObjectType type() {
+ return UnknownObjectType;
+ }
+
+ virtual ~NativeObject() {
+ }
+ ;
+};
+
+template<typename T>
+NativeObject * ObjectCreator() {
+ return new T();
+}
+
+typedef NativeObject * (*ObjectCreatorFunc)();
+
+typedef ObjectCreatorFunc (*GetObjectCreatorFunc)(const std::string & name);
+
+typedef void * (*FunctionGetter)(const std::string & name);
+
+typedef int32_t (*InitLibraryFunc)();
+
+/**
+ * Exceptions
+ */
+class HadoopException : public std::exception {
+private:
+ std::string _reason;
+public:
+ HadoopException(const string & what);
+ virtual ~HadoopException() throw () {
+ }
+
+ virtual const char* what() const throw () {
+ return _reason.c_str();
+ }
+};
+
+class OutOfMemoryException : public HadoopException {
+public:
+ OutOfMemoryException(const string & what)
+ : HadoopException(what) {
+ }
+};
+
+class IOException : public HadoopException {
+public:
+ IOException(const string & what)
+ : HadoopException(what) {
+ }
+};
+
+class UnsupportException : public HadoopException {
+public:
+ UnsupportException(const string & what)
+ : HadoopException(what) {
+ }
+};
+
+/**
+ * Exception when call java methods using JNI
+ */
+class JavaException : public HadoopException {
+public:
+ JavaException(const string & what)
+ : HadoopException(what) {
+ }
+};
+
+#define STRINGIFY(x) #x
+#define TOSTRING(x) STRINGIFY(x)
+#define AT __FILE__ ":" TOSTRING(__LINE__)
+#define THROW_EXCEPTION(type, what) throw type((std::string(AT":") + what))
+#define THROW_EXCEPTION_EX(type, fmt, args...) \
+ throw type(StringUtil::Format("%s:" fmt, AT, ##args))
+
+class Config {
+protected:
+ map<string, string> _configs;
+public:
+ Config() {
+ }
+ ~Config() {
+ }
+
+ const char * get(const string & name);
+
+ string get(const string & name, const string & defaultValue);
+
+ bool getBool(const string & name, bool defaultValue);
+
+ int64_t getInt(const string & name, int64_t defaultValue = -1);
+
+ float getFloat(const string & name, float defaultValue = -1);
+
+ void getStrings(const string & name, vector<string> & dest);
+
+ void getInts(const string & name, vector<int64_t> & dest);
+
+ void getFloats(const string & name, vector<float> & dest);
+
+ void set(const string & key, const string & value);
+
+ void setInt(const string & name, int64_t value);
+
+ void setBool(const string & name, bool value);
+
+ /**
+ * Load configs from a config file with the following format:
+ * # comment
+ * key1=value1
+ * key2=value2
+ * ...
+ */
+ void load(const string & path);
+
+ /**
+ * Load configs form command line args
+ * key1=value1 key2=value2,value2
+ */
+ void parse(int32_t argc, const char ** argv);
+};
+
+class Command {
+private:
+ int _id;
+ const char * _description;
+
+public:
+ Command(int id, const char * description)
+ : _id(id), _description(description) {
+ }
+
+ Command(int id)
+ : _id(id), _description(NULL) {
+ }
+
+ int id() const {
+ return _id;
+ }
+
+ const char * description() const {
+ return _description;
+ }
+
+ bool equals(const Command & other) const {
+ if (_id == other._id) {
+ return true;
+ }
+ return false;
+ }
+};
+
+class Buffer {
+protected:
+ const char * _data;
+ uint32_t _length;
+
+public:
+ Buffer()
+ : _data(NULL), _length(0) {
+ }
+
+ Buffer(const char * data, uint32_t length)
+ : _data(data), _length(length) {
+ }
+
+ ~Buffer() {
+ }
+
+ void reset(const char * data, uint32_t length) {
+ this->_data = data;
+ this->_length = length;
+ }
+
+ const char * data() const {
+ return _data;
+ }
+
+ uint32_t length() const {
+ return _length;
+ }
+
+ void data(const char * data) {
+ this->_data = data;
+ }
+
+ void length(uint32_t length) {
+ this->_length = length;
+ }
+
+ string toString() const {
+ return string(_data, _length);
+ }
+};
+
+class InputSplit {
+public:
+ virtual uint64_t getLength() = 0;
+ virtual vector<string> & getLocations() = 0;
+ virtual void readFields(const string & data) = 0;
+ virtual void writeFields(string & dest) = 0;
+ virtual string toString() = 0;
+
+ virtual ~InputSplit() {
+
+ }
+};
+
+class Configurable : public NativeObject {
+public:
+ Configurable() {
+ }
+
+ virtual void configure(Config * config) {
+ }
+};
+
+class Collector {
+public:
+ virtual ~Collector() {
+ }
+
+ virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen) {
+ }
+
+ virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen,
+ int32_t partition) {
+ collect(key, keyLen, value, valueLen);
+ }
+};
+
+class Progress {
+public:
+ virtual ~Progress() {
+ }
+ virtual float getProgress() = 0;
+};
+
+class Counter {
+private:
+ // not thread safe
+ // TODO: use atomic
+ volatile uint64_t _count;
+
+ string _group;
+ string _name;
+public:
+ Counter(const string & group, const string & name)
+ : _count(0), _group(group), _name(name) {
+ }
+
+ const string & group() const {
+ return _group;
+ }
+ const string & name() const {
+ return _name;
+ }
+
+ uint64_t get() const {
+ return _count;
+ }
+
+ void increase() {
+ _count++;
+ }
+
+ void increase(uint64_t cnt) {
+ _count += cnt;
+ }
+};
+
+class KVIterator {
+public:
+ virtual ~KVIterator() {
+ }
+ virtual bool next(Buffer & key, Buffer & value) = 0;
+};
+
+class RecordReader : public KVIterator, public Configurable, public Progress {
+public:
+ virtual NativeObjectType type() {
+ return RecordReaderType;
+ }
+
+ virtual bool next(Buffer & key, Buffer & value) = 0;
+
+ virtual float getProgress() = 0;
+
+ virtual void close() = 0;
+};
+
+class RecordWriter : public Collector, public Configurable {
+public:
+ virtual NativeObjectType type() {
+ return RecordWriterType;
+ }
+
+ virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen) {
+ }
+
+ virtual void close() {
+ }
+
+};
+
+class ProcessorBase : public Configurable {
+protected:
+ Collector * _collector;
+public:
+ ProcessorBase()
+ : _collector(NULL) {
+ }
+
+ void setCollector(Collector * collector) {
+ _collector = collector;
+ }
+
+ Collector * getCollector() {
+ return _collector;
+ }
+
+ void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen) {
+ _collector->collect(key, keyLen, value, valueLen);
+ }
+
+ void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen,
+ int32_t partition) {
+ _collector->collect(key, keyLen, value, valueLen, partition);
+ }
+
+ Counter * getCounter(const string & group, const string & name);
+
+ virtual void close() {
+ }
+};
+
+class Mapper : public ProcessorBase {
+public:
+ virtual NativeObjectType type() {
+ return MapperType;
+ }
+
+ /**
+ * Map interface, default IdenticalMapper
+ */
+ virtual void map(const char * key, uint32_t keyLen, const char * value, uint32_t valueLen) {
+ collect(key, keyLen, value, valueLen);
+ }
+};
+
+class Partitioner : public Configurable {
+public:
+ virtual NativeObjectType type() {
+ return PartitionerType;
+ }
+
+ /**
+ * Partition interface
+ * @param key key buffer
+ * @param keyLen key length, can be modified to smaller value
+ * to truncate key
+ * @return partition number
+ */
+ virtual uint32_t getPartition(const char * key, uint32_t & keyLen, uint32_t numPartition);
+};
+
+enum KeyGroupIterState {
+ SAME_KEY,
+ NEW_KEY,
+ NEW_KEY_VALUE,
+ NO_MORE,
+};
+
+class KeyGroupIterator {
+public:
+ virtual ~KeyGroupIterator() {
+ }
+ /**
+ * Move to nextKey, or begin this iterator
+ */
+ virtual bool nextKey() = 0;
+
+ /**
+ * Get key of this input group
+ */
+ virtual const char * getKey(uint32_t & len) = 0;
+
+ /**
+ * Get next value of this input group
+ * @return NULL if no more
+ */
+ virtual const char * nextValue(uint32_t & len) = 0;
+};
+
+class Reducer : public ProcessorBase {
+public:
+ virtual NativeObjectType type() {
+ return ReducerType;
+ }
+
+ /**
+ * Reduce interface, default IdenticalReducer
+ */
+ virtual void reduce(KeyGroupIterator & input) {
+ const char * key;
+ const char * value;
+ uint32_t keyLen;
+ uint32_t valueLen;
+ key = input.getKey(keyLen);
+ while (NULL != (value = input.nextValue(valueLen))) {
+ collect(key, keyLen, value, valueLen);
+ }
+ }
+};
+
+/**
+ * Folder API used for hashtable based aggregation
+ * Folder will be used in this way:
+ * on(key, value):
+ * state = hashtable.get(key)
+ * if state == None:
+ * size = size()
+ * if size == -1:
+ * state = init(null, -1)
+ * elif size > 0:
+ * state = fixallocator.get(key)
+ * init(state, size)
+ * folder(state, value, value.len)
+ *
+ * final():
+ * for k,state in hashtable:
+ * final(key, key.len, state)
+ */
+class Folder : public ProcessorBase {
+public:
+ virtual NativeObjectType type() {
+ return FolderType;
+ }
+
+ /**
+ * Get aggregator state size
+ * @return state storage size
+ * -1 size not fixed or unknown, default
+ * e.g. list map tree
+ * 0 don't need to store state
+ * >0 fixed sized state
+ * e.g. int32 int64 float.
+ */
+ virtual int32_t size() {
+ return -1;
+ }
+
+ /**
+ * Create and/or init new state
+ */
+ virtual void * init(const char * key, uint32_t keyLen) {
+ return NULL;
+ }
+
+ /**
+ * Aggregation function
+ */
+ virtual void folder(void * dest, const char * value, uint32_t valueLen) {
+ }
+
+ virtual void final(const char * key, uint32_t keyLen, void * dest) {
+ }
+};
+
+enum KeyValueType {
+ TextType = 0,
+ BytesType = 1,
+ ByteType = 2,
+ BoolType = 3,
+ IntType = 4,
+ LongType = 5,
+ FloatType = 6,
+ DoubleType = 7,
+ MD5HashType = 8,
+ VIntType = 9,
+ VLongType = 10,
+ UnknownType = -1
+};
+
+typedef int (*ComparatorPtr)(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength);
+
+ComparatorPtr get_comparator(const KeyValueType keyType, const char * comparatorName);
+
+typedef void (*ANY_FUNC_PTR)();
+
+} // namespace NativeTask;
+
+/**
+ * Use these two predefined macro to define a class library:
+ * DEFINE_NATIVE_LIBRARY(Library)
+ * REGISTER_CLASS(Type, Library)
+ * For example, suppose we have a demo application, which has
+ * defined class MyDemoMapper and MyDemoReducer, to register
+ * this module & these two classes, you need to add following
+ * code to you source code.
+ * DEFINE_NATIVE_LIBRARY(MyDemo) {
+ * REGISTER_CLASS(MyDemoMapper, MyDemo);
+ * REGISTER_CLASS(MyDemoReducer, MyDemo);
+ * }
+ * The class name for MyDemoMapper will be MyDemo.MyDemoMapper,
+ * and similar for MyDemoReducer.
+ * Then you can set native.mapper.class to MyDemo.MyDemoMapper
+ * in JobConf.
+ */
+
+#define DEFINE_NATIVE_LIBRARY(Library) \
+ static std::map<std::string, NativeTask::ObjectCreatorFunc> Library##ClassMap__; \
+ extern "C" void * Library##GetFunctionGetter(const std::string & name) { \
+ void * ret = NULL; \
+ std::map<std::string, NativeTask::ObjectCreatorFunc>::iterator itr = Library##ClassMap__.find(name); \
+ if (itr != Library##ClassMap__.end()) { \
+ return (void *)(itr->second); \
+ } \
+ return NULL; \
+ } \
+ extern "C" NativeTask::ObjectCreatorFunc Library##GetObjectCreator(const std::string & name) { \
+ NativeObject * ret = NULL; \
+ std::map<std::string, NativeTask::ObjectCreatorFunc>::iterator itr = Library##ClassMap__.find(name); \
+ if (itr != Library##ClassMap__.end()) { \
+ return itr->second; \
+ } \
+ return NULL; \
+ } \
+ extern "C" void Library##Init()
+
+#define REGISTER_CLASS(Type, Library) Library##ClassMap__[#Library"."#Type] = NativeTask::ObjectCreator<Type>
+
+#define REGISTER_FUNCTION(Type, Library) Library##ClassMap__[#Library"."#Type] = (ObjectCreatorFunc)Type
+
+#endif /* NATIVETASK_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/BlockCodec.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/BlockCodec.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/BlockCodec.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/BlockCodec.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "NativeTask.h"
+#include "BlockCodec.h"
+
+namespace NativeTask {
+
+BlockCompressStream::BlockCompressStream(OutputStream * stream, uint32_t bufferSizeHint)
+ : CompressStream(stream), _compressedBytesWritten(0), _tempBufferSize(0), _tempBuffer(NULL) {
+ _hint = bufferSizeHint;
+ _blockMax = bufferSizeHint / 2 * 3;
+}
+
+void BlockCompressStream::init() {
+ _tempBufferSize = maxCompressedLength(_blockMax) + 8;
+ _tempBuffer = new char[_tempBufferSize];
+}
+
+BlockCompressStream::~BlockCompressStream() {
+ delete[] _tempBuffer;
+ _tempBuffer = NULL;
+ _tempBufferSize = 0;
+}
+
+void BlockCompressStream::write(const void * buff, uint32_t length) {
+ while (length > 0) {
+ uint32_t take = length < _blockMax ? length : _hint;
+ compressOneBlock(buff, take);
+ buff = ((const char *)buff) + take;
+ length -= take;
+ }
+}
+
+void BlockCompressStream::flush() {
+ _stream->flush();
+}
+
+void BlockCompressStream::close() {
+ flush();
+}
+
+void BlockCompressStream::writeDirect(const void * buff, uint32_t length) {
+ _stream->write(buff, length);
+ _compressedBytesWritten += length;
+}
+
+uint64_t BlockCompressStream::compressedBytesWritten() {
+ return _compressedBytesWritten;
+}
+
+//////////////////////////////////////////////////////////////
+
+BlockDecompressStream::BlockDecompressStream(InputStream * stream, uint32_t bufferSizeHint)
+ : DecompressStream(stream), _tempBufferSize(0), _tempBuffer(NULL) {
+ _hint = bufferSizeHint;
+ _blockMax = bufferSizeHint / 2 * 3;
+ _tempDecompressBuffer = NULL;
+ _tempDecompressBufferSize = 0;
+ _tempDecompressBufferUsed = 0;
+ _tempDecompressBufferCapacity = 0;
+ _compressedBytesRead = 0;
+}
+
+void BlockDecompressStream::init() {
+ _tempBufferSize = maxCompressedLength(_blockMax) + 8;
+ _tempBuffer = (char*)malloc(_tempBufferSize);
+
+}
+
+BlockDecompressStream::~BlockDecompressStream() {
+ close();
+ if (NULL != _tempBuffer) {
+ free(_tempBuffer);
+ _tempBuffer = NULL;
+ }
+ _tempBufferSize = 0;
+}
+
+int32_t BlockDecompressStream::read(void * buff, uint32_t length) {
+ if (_tempDecompressBufferSize == 0) {
+ uint32_t sizes[2];
+ int32_t rd = _stream->readFully(&sizes, sizeof(uint32_t) * 2);
+ if (rd <= 0) {
+ // EOF
+ return -1;
+ }
+ if (rd != sizeof(uint32_t) * 2) {
+ THROW_EXCEPTION(IOException, "readFully get incomplete data");
+ }
+ _compressedBytesRead += rd;
+ sizes[0] = bswap(sizes[0]);
+ sizes[1] = bswap(sizes[1]);
+ if (sizes[0] <= length) {
+ uint32_t len = decompressOneBlock(sizes[1], buff, sizes[0]);
+ if (len != sizes[0]) {
+ THROW_EXCEPTION(IOException, "Block decompress data error, length not match");
+ }
+ return len;
+ } else {
+ if (sizes[0] > _tempDecompressBufferCapacity) {
+ char * newBuffer = (char *)realloc(_tempDecompressBuffer, sizes[0]);
+ if (newBuffer == NULL) {
+ THROW_EXCEPTION(OutOfMemoryException, "realloc failed");
+ }
+ _tempDecompressBuffer = newBuffer;
+ _tempDecompressBufferCapacity = sizes[0];
+ }
+ uint32_t len = decompressOneBlock(sizes[1], _tempDecompressBuffer, sizes[0]);
+ if (len != sizes[0]) {
+ THROW_EXCEPTION(IOException, "Block decompress data error, length not match");
+ }
+ _tempDecompressBufferSize = sizes[0];
+ _tempDecompressBufferUsed = 0;
+ }
+ }
+ if (_tempDecompressBufferSize > 0) {
+ uint32_t left = _tempDecompressBufferSize - _tempDecompressBufferUsed;
+ if (length < left) {
+ memcpy(buff, _tempDecompressBuffer + _tempDecompressBufferUsed, length);
+ _tempDecompressBufferUsed += length;
+ return length;
+ } else {
+ memcpy(buff, _tempDecompressBuffer + _tempDecompressBufferUsed, left);
+ _tempDecompressBufferSize = 0;
+ _tempDecompressBufferUsed = 0;
+ return left;
+ }
+ }
+ // should not get here
+ THROW_EXCEPTION(IOException, "Decompress logic error");
+ return -1;
+}
+
+void BlockDecompressStream::close() {
+ if (_tempDecompressBufferSize > 0) {
+ LOG("[BlockDecompressStream] Some data left in the _tempDecompressBuffer when close()");
+ }
+ if (NULL != _tempDecompressBuffer) {
+ free(_tempDecompressBuffer);
+ _tempDecompressBuffer = NULL;
+ _tempDecompressBufferCapacity = 0;
+ }
+ _tempDecompressBufferSize = 0;
+ _tempDecompressBufferUsed = 0;
+}
+
+int32_t BlockDecompressStream::readDirect(void * buff, uint32_t length) {
+ if (_tempDecompressBufferSize > 0) {
+ THROW_EXCEPTION(IOException, "temp decompress data exists when call readDirect()");
+ }
+ int32_t ret = _stream->readFully(buff, length);
+ if (ret > 0) {
+ _compressedBytesRead += ret;
+ }
+ return ret;
+}
+
+uint64_t BlockDecompressStream::compressedBytesRead() {
+ return _compressedBytesRead;
+}
+
+} // namespace NativeTask
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/BlockCodec.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/BlockCodec.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/BlockCodec.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/BlockCodec.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef BLOCKCODEC_H_
+#define BLOCKCODEC_H_
+
+#include "Compressions.h"
+
+namespace NativeTask {
+
+class BlockCompressStream : public CompressStream {
+protected:
+ uint32_t _hint;
+ uint32_t _blockMax;
+ char * _tempBuffer;
+ uint32_t _tempBufferSize;
+ uint64_t _compressedBytesWritten;
+public:
+ BlockCompressStream(OutputStream * stream, uint32_t bufferSizeHint);
+
+ virtual ~BlockCompressStream();
+
+ virtual void write(const void * buff, uint32_t length);
+
+ virtual void flush();
+
+ virtual void close();
+
+ virtual void writeDirect(const void * buff, uint32_t length);
+
+ virtual uint64_t compressedBytesWritten();
+
+ void init();
+
+protected:
+ virtual uint64_t maxCompressedLength(uint64_t origLength) {
+ return origLength;
+ }
+
+ virtual void compressOneBlock(const void * buff, uint32_t length) {
+ }
+};
+
+class BlockDecompressStream : public DecompressStream {
+protected:
+ uint32_t _hint;
+ uint32_t _blockMax;
+ char * _tempBuffer;
+ uint32_t _tempBufferSize;
+ char * _tempDecompressBuffer;
+ uint32_t _tempDecompressBufferSize;
+ uint32_t _tempDecompressBufferUsed;
+ uint32_t _tempDecompressBufferCapacity;
+ uint64_t _compressedBytesRead;
+public:
+ BlockDecompressStream(InputStream * stream, uint32_t bufferSizeHint);
+
+ virtual ~BlockDecompressStream();
+
+ virtual int32_t read(void * buff, uint32_t length);
+
+ virtual void close();
+
+ virtual int32_t readDirect(void * buff, uint32_t length);
+
+ virtual uint64_t compressedBytesRead();
+
+ void init();
+
+protected:
+ virtual uint64_t maxCompressedLength(uint64_t origLength) {
+ return origLength;
+ }
+
+ virtual uint32_t decompressOneBlock(uint32_t compressedSize, void * buff, uint32_t length) {
+ //TODO: add implementation
+ return 0;
+ }
+};
+
+} // namespace NativeTask
+
+#endif /* BLOCKCODEC_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/GzipCodec.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/GzipCodec.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/GzipCodec.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/GzipCodec.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <zconf.h>
+#include <zlib.h>
+#include "commons.h"
+#include "GzipCodec.h"
+#include <iostream>
+
+namespace NativeTask {
+
+GzipCompressStream::GzipCompressStream(OutputStream * stream, uint32_t bufferSizeHint)
+ : CompressStream(stream), _compressedBytesWritten(0), _zstream(NULL), _finished(false) {
+ _buffer = new char[bufferSizeHint];
+ _capacity = bufferSizeHint;
+ std::cout << "gzip capacity " << _capacity << std::endl;
+ _zstream = malloc(sizeof(z_stream));
+ z_stream * zstream = (z_stream*)_zstream;
+ memset(zstream, 0, sizeof(z_stream));
+ if (Z_OK != deflateInit2(zstream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 31, 8,
+ Z_DEFAULT_STRATEGY)) {
+ free(_zstream);
+ _zstream = NULL;
+ THROW_EXCEPTION(IOException, "deflateInit2 failed");
+ }
+ zstream->next_out = (Bytef *)_buffer;
+ zstream->avail_out = _capacity;
+}
+
+GzipCompressStream::~GzipCompressStream() {
+ if (_zstream != NULL) {
+ free(_zstream);
+ _zstream = NULL;
+ }
+ delete[] _buffer;
+ _buffer = NULL;
+}
+
+void GzipCompressStream::write(const void * buff, uint32_t length) {
+ std::cout << "gzip " << length << std::endl;
+ z_stream * zstream = (z_stream*)_zstream;
+ zstream->next_in = (Bytef*)buff;
+ zstream->avail_in = length;
+ while (true) {
+ int ret = deflate(zstream, Z_NO_FLUSH);
+ std::cout << "gzip ret status " << ret << std::endl;
+ if (ret == Z_OK) {
+ std::cout << "gzip avail_out " << zstream->avail_out << std::endl;
+ if (zstream->avail_out == 0) {
+ std::cout << "gzip write capacity " << _capacity << std::endl;
+ _stream->write(_buffer, _capacity);
+ _compressedBytesWritten += _capacity;
+ zstream->next_out = (Bytef *)_buffer;
+ zstream->avail_out = _capacity;
+ }
+ if (zstream->avail_in == 0) {
+ break;
+ }
+ } else {
+ THROW_EXCEPTION(IOException, "deflate return error");
+ }
+ }
+ _finished = false;
+}
+
+void GzipCompressStream::flush() {
+ std::cout << "gzip flush called";
+ z_stream * zstream = (z_stream*)_zstream;
+ while (true) {
+ int ret = deflate(zstream, Z_FINISH);
+ if (ret == Z_OK) {
+ if (zstream->avail_out == 0) {
+ _stream->write(_buffer, _capacity);
+ _compressedBytesWritten += _capacity;
+ zstream->next_out = (Bytef *)_buffer;
+ zstream->avail_out = _capacity;
+ } else {
+ THROW_EXCEPTION(IOException, "flush state error");
+ }
+ } else if (ret == Z_STREAM_END) {
+ size_t wt = zstream->next_out - (Bytef*)_buffer;
+ _stream->write(_buffer, wt);
+ _compressedBytesWritten += wt;
+ zstream->next_out = (Bytef *)_buffer;
+ zstream->avail_out = _capacity;
+ break;
+ }
+ }
+ _finished = true;
+ _stream->flush();
+}
+
+void GzipCompressStream::resetState() {
+ z_stream * zstream = (z_stream*)_zstream;
+ deflateReset(zstream);
+}
+
+void GzipCompressStream::close() {
+ std::cout << "gzip close called";
+ if (!_finished) {
+ flush();
+ }
+}
+
+void GzipCompressStream::writeDirect(const void * buff, uint32_t length) {
+ if (!_finished) {
+ flush();
+ }
+ _stream->write(buff, length);
+ _compressedBytesWritten += length;
+}
+
+//////////////////////////////////////////////////////////////
+
+GzipDecompressStream::GzipDecompressStream(InputStream * stream, uint32_t bufferSizeHint)
+ : DecompressStream(stream), _compressedBytesRead(0), _zstream(NULL) {
+ _buffer = new char[bufferSizeHint];
+ _capacity = bufferSizeHint;
+ _zstream = malloc(sizeof(z_stream));
+ z_stream * zstream = (z_stream*)_zstream;
+ memset(zstream, 0, sizeof(z_stream));
+ if (Z_OK != inflateInit2(zstream, 31)) {
+ free(_zstream);
+ _zstream = NULL;
+ THROW_EXCEPTION(IOException, "inflateInit2 failed");
+ }
+ zstream->next_in = NULL;
+ zstream->avail_in = 0;
+ _eof = false;
+}
+
+GzipDecompressStream::~GzipDecompressStream() {
+ if (_zstream != NULL) {
+ free(_zstream);
+ _zstream = NULL;
+ }
+ delete[] _buffer;
+ _buffer = NULL;
+}
+
+int32_t GzipDecompressStream::read(void * buff, uint32_t length) {
+ z_stream * zstream = (z_stream*)_zstream;
+ zstream->next_out = (Bytef*)buff;
+ zstream->avail_out = length;
+ while (true) {
+ if (zstream->avail_in == 0) {
+ int32_t rd = _stream->read(_buffer, _capacity);
+ if (rd <= 0) {
+ _eof = true;
+ size_t wt = zstream->next_out - (Bytef*)buff;
+ return wt > 0 ? wt : -1;
+ } else {
+ _compressedBytesRead += rd;
+ zstream->next_in = (Bytef*)_buffer;
+ zstream->avail_in = rd;
+ }
+ }
+ int ret = inflate(zstream, Z_NO_FLUSH);
+ if (ret == Z_OK || ret == Z_STREAM_END) {
+ if (zstream->avail_out == 0) {
+// printf("return %d\n", length);
+ return length;
+ }
+ } else {
+// printf("Error: %d\n", ret);
+ return -1;
+ }
+ }
+ return -1;
+}
+
+void GzipDecompressStream::close() {
+}
+
+int32_t GzipDecompressStream::readDirect(void * buff, uint32_t length) {
+ int32_t ret = _stream->readFully(buff, length);
+ if (ret > 0) {
+ _compressedBytesRead += ret;
+ }
+ return ret;
+}
+
+} // namespace NativeTask
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/GzipCodec.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/GzipCodec.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/GzipCodec.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/GzipCodec.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef GZIPCODEC_H_
+#define GZIPCODEC_H_
+
+#include "Compressions.h"
+
+namespace NativeTask {
+
+class GzipCompressStream : public CompressStream {
+protected:
+ uint64_t _compressedBytesWritten;
+ char * _buffer;
+ uint32_t _capacity;
+ void * _zstream;
+ bool _finished;
+public:
+ GzipCompressStream(OutputStream * stream, uint32_t bufferSizeHint);
+
+ virtual ~GzipCompressStream();
+
+ virtual void write(const void * buff, uint32_t length);
+
+ virtual void flush();
+
+ virtual void close();
+
+ virtual void finish() {
+ flush();
+ }
+
+ virtual void resetState();
+
+ virtual void writeDirect(const void * buff, uint32_t length);
+
+ virtual uint64_t compressedBytesWritten() {
+ return _compressedBytesWritten;
+ }
+};
+
+class GzipDecompressStream : public DecompressStream {
+protected:
+ uint64_t _compressedBytesRead;
+ char * _buffer;
+ uint32_t _capacity;
+ void * _zstream;
+ bool _eof;
+public:
+ GzipDecompressStream(InputStream * stream, uint32_t bufferSizeHint);
+
+ virtual ~GzipDecompressStream();
+
+ virtual int32_t read(void * buff, uint32_t length);
+
+ virtual void close();
+
+ virtual int32_t readDirect(void * buff, uint32_t length);
+
+ virtual uint64_t compressedBytesRead() {
+ return _compressedBytesRead;
+ }
+};
+
+} // namespace NativeTask
+
+#endif /* GZIPCODEC_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "NativeTask.h"
+#include "Lz4Codec.h"
+
+extern "C" {
+extern int LZ4_compress(char* source, char* dest, int isize);
+extern int LZ4_uncompress(char* source, char* dest, int osize);
+
+/*
+ LZ4_compress() :
+ return : the number of bytes in compressed buffer dest
+ note : destination buffer must be already allocated.
+ To avoid any problem, size it to handle worst cases situations (input data not compressible)
+ Worst case size is : "inputsize + 0.4%", with "0.4%" being at least 8 bytes.
+
+ LZ4_uncompress() :
+ osize : is the output size, therefore the original size
+ return : the number of bytes read in the source buffer
+ If the source stream is malformed, the function will stop decoding and return a negative result, indicating the byte position of the faulty instruction
+ This version never writes beyond dest + osize, and is therefore protected against malicious data packets
+ note 2 : destination buffer must be already allocated
+ */
+}
+
+namespace NativeTask {
+
+static int32_t LZ4_MaxCompressedSize(int32_t orig) {
+ return std::max((int32_t)(orig * 1.005), orig + 8);
+}
+
+Lz4CompressStream::Lz4CompressStream(OutputStream * stream, uint32_t bufferSizeHint)
+ : BlockCompressStream(stream, bufferSizeHint) {
+ init();
+}
+
+void Lz4CompressStream::compressOneBlock(const void * buff, uint32_t length) {
+ size_t compressedLength = _tempBufferSize - 8;
+ int ret = LZ4_compress((char*)buff, _tempBuffer + 8, length);
+ if (ret > 0) {
+ compressedLength = ret;
+ ((uint32_t*)_tempBuffer)[0] = bswap(length);
+ ((uint32_t*)_tempBuffer)[1] = bswap((uint32_t)compressedLength);
+ _stream->write(_tempBuffer, compressedLength + 8);
+ _compressedBytesWritten += (compressedLength + 8);
+ } else {
+ THROW_EXCEPTION(IOException, "compress LZ4 failed");
+ }
+}
+
+uint64_t Lz4CompressStream::maxCompressedLength(uint64_t origLength) {
+ return LZ4_MaxCompressedSize(origLength);
+}
+
+//////////////////////////////////////////////////////////////
+
+Lz4DecompressStream::Lz4DecompressStream(InputStream * stream, uint32_t bufferSizeHint)
+ : BlockDecompressStream(stream, bufferSizeHint) {
+ init();
+}
+
+uint32_t Lz4DecompressStream::decompressOneBlock(uint32_t compressedSize, void * buff,
+ uint32_t length) {
+ if (compressedSize > _tempBufferSize) {
+ char * newBuffer = (char *)realloc(_tempBuffer, compressedSize);
+ if (newBuffer == NULL) {
+ THROW_EXCEPTION(OutOfMemoryException, "realloc failed");
+ }
+ _tempBuffer = newBuffer;
+ _tempBufferSize = compressedSize;
+ }
+ uint32_t rd = _stream->readFully(_tempBuffer, compressedSize);
+ if (rd != compressedSize) {
+ THROW_EXCEPTION(IOException, "readFully reach EOF");
+ }
+ _compressedBytesRead += rd;
+ uint32_t ret = LZ4_uncompress(_tempBuffer, (char*)buff, length);
+ if (ret == compressedSize) {
+ return length;
+ } else {
+ THROW_EXCEPTION(IOException, "decompress LZ4 failed");
+ }
+}
+
+uint64_t Lz4DecompressStream::maxCompressedLength(uint64_t origLength) {
+ return LZ4_MaxCompressedSize(origLength);
+}
+
+} // namespace NativeTask
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LZ4CODEC_H_
+#define LZ4CODEC_H_
+
+#include "Compressions.h"
+#include "BlockCodec.h"
+
+namespace NativeTask {
+
+class Lz4CompressStream : public BlockCompressStream {
+public:
+ Lz4CompressStream(OutputStream * stream, uint32_t bufferSizeHint);
+protected:
+ virtual uint64_t maxCompressedLength(uint64_t origLength);
+ virtual void compressOneBlock(const void * buff, uint32_t length);
+};
+
+class Lz4DecompressStream : public BlockDecompressStream {
+public:
+ Lz4DecompressStream(InputStream * stream, uint32_t bufferSizeHint);
+protected:
+ virtual uint64_t maxCompressedLength(uint64_t origLength);
+ virtual uint32_t decompressOneBlock(uint32_t compressedSize, void * buff, uint32_t length);
+};
+
+} // namespace NativeTask
+
+#endif /* LZ4CODEC_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "snappy-c.h"
+#include "commons.h"
+#include "NativeTask.h"
+#include "SnappyCodec.h"
+
+namespace NativeTask {
+
+SnappyCompressStream::SnappyCompressStream(OutputStream * stream, uint32_t bufferSizeHint)
+ : BlockCompressStream(stream, bufferSizeHint) {
+ init();
+}
+
+void SnappyCompressStream::compressOneBlock(const void * buff, uint32_t length) {
+ size_t compressedLength = _tempBufferSize - 8;
+ snappy_status ret = snappy_compress((const char*)buff, length, _tempBuffer + 8,
+ &compressedLength);
+ if (ret == SNAPPY_OK) {
+ ((uint32_t*)_tempBuffer)[0] = bswap(length);
+ ((uint32_t*)_tempBuffer)[1] = bswap((uint32_t)compressedLength);
+ _stream->write(_tempBuffer, compressedLength + 8);
+ _compressedBytesWritten += (compressedLength + 8);
+ } else if (ret == SNAPPY_INVALID_INPUT) {
+ THROW_EXCEPTION(IOException, "compress SNAPPY_INVALID_INPUT");
+ } else if (ret == SNAPPY_BUFFER_TOO_SMALL) {
+ THROW_EXCEPTION(IOException, "compress SNAPPY_BUFFER_TOO_SMALL");
+ } else {
+ THROW_EXCEPTION(IOException, "compress snappy failed");
+ }
+}
+
+uint64_t SnappyCompressStream::maxCompressedLength(uint64_t origLength) {
+ return snappy_max_compressed_length(origLength);
+}
+
+//////////////////////////////////////////////////////////////
+
+SnappyDecompressStream::SnappyDecompressStream(InputStream * stream, uint32_t bufferSizeHint)
+ : BlockDecompressStream(stream, bufferSizeHint) {
+ init();
+}
+
+uint32_t SnappyDecompressStream::decompressOneBlock(uint32_t compressedSize, void * buff,
+ uint32_t length) {
+ if (compressedSize > _tempBufferSize) {
+ char * newBuffer = (char *)realloc(_tempBuffer, compressedSize);
+ if (newBuffer == NULL) {
+ THROW_EXCEPTION(OutOfMemoryException, "realloc failed");
+ }
+ _tempBuffer = newBuffer;
+ _tempBufferSize = compressedSize;
+ }
+ uint32_t rd = _stream->readFully(_tempBuffer, compressedSize);
+ if (rd != compressedSize) {
+ THROW_EXCEPTION(IOException, "readFully reach EOF");
+ }
+ _compressedBytesRead += rd;
+ size_t uncompressedLength = length;
+ snappy_status ret = snappy_uncompress(_tempBuffer, compressedSize, (char *)buff,
+ &uncompressedLength);
+ if (ret == SNAPPY_OK) {
+ return uncompressedLength;
+ } else if (ret == SNAPPY_INVALID_INPUT) {
+ THROW_EXCEPTION(IOException, "decompress SNAPPY_INVALID_INPUT");
+ } else if (ret == SNAPPY_BUFFER_TOO_SMALL) {
+ THROW_EXCEPTION(IOException, "decompress SNAPPY_BUFFER_TOO_SMALL");
+ } else {
+ THROW_EXCEPTION(IOException, "decompress snappy failed");
+ }
+}
+
+uint64_t SnappyDecompressStream::maxCompressedLength(uint64_t origLength) {
+ return snappy_max_compressed_length(origLength);
+}
+
+} // namespace NativeTask
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SNAPPYCODEC_H_
+#define SNAPPYCODEC_H_
+
+#include "Compressions.h"
+#include "BlockCodec.h"
+
+namespace NativeTask {
+
+class SnappyCompressStream : public BlockCompressStream {
+public:
+ SnappyCompressStream(OutputStream * stream, uint32_t bufferSizeHint);
+protected:
+ virtual uint64_t maxCompressedLength(uint64_t origLength);
+ virtual void compressOneBlock(const void * buff, uint32_t length);
+};
+
+class SnappyDecompressStream : public BlockDecompressStream {
+public:
+ SnappyDecompressStream(InputStream * stream, uint32_t bufferSizeHint);
+
+protected:
+ virtual uint64_t maxCompressedLength(uint64_t origLength);
+ virtual uint32_t decompressOneBlock(uint32_t compressedSize, void * buff, uint32_t length);
+};
+
+} // namespace NativeTask
+
+#endif /* SNAPPYCODEC_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy-c.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy-c.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy-c.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy-c.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2011 Martin Gieseking <ma...@uos.de>.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Plain C interface (a wrapper around the C++ implementation).
+ */
+
+#ifndef UTIL_SNAPPY_OPENSOURCE_SNAPPY_C_H_
+#define UTIL_SNAPPY_OPENSOURCE_SNAPPY_C_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stddef.h>
+
+/*
+ * Return values; see the documentation for each function to know
+ * what each can return.
+ */
+typedef enum {
+ SNAPPY_OK = 0,
+ SNAPPY_INVALID_INPUT = 1,
+ SNAPPY_BUFFER_TOO_SMALL = 2
+} snappy_status;
+
+/*
+ * Takes the data stored in "input[0..input_length-1]" and stores
+ * it in the array pointed to by "compressed".
+ *
+ * <compressed_length> signals the space available in "compressed".
+ * If it is not at least equal to "snappy_max_compressed_length(input_length)",
+ * SNAPPY_BUFFER_TOO_SMALL is returned. After successful compression,
+ * <compressed_length> contains the true length of the compressed output,
+ * and SNAPPY_OK is returned.
+ *
+ * Example:
+ * size_t output_length = snappy_max_compressed_length(input_length);
+ * char* output = (char*)malloc(output_length);
+ * if (snappy_compress(input, input_length, output, &output_length)
+ * == SNAPPY_OK) {
+ * ... Process(output, output_length) ...
+ * }
+ * free(output);
+ */
+snappy_status snappy_compress(const char* input,
+ size_t input_length,
+ char* compressed,
+ size_t* compressed_length);
+
+/*
+ * Given data in "compressed[0..compressed_length-1]" generated by
+ * calling the snappy_compress routine, this routine stores
+ * the uncompressed data to
+ * uncompressed[0..uncompressed_length-1].
+ * Returns failure (a value not equal to SNAPPY_OK) if the message
+ * is corrupted and could not be decrypted.
+ *
+ * <uncompressed_length> signals the space available in "uncompressed".
+ * If it is not at least equal to the value returned by
+ * snappy_uncompressed_length for this stream, SNAPPY_BUFFER_TOO_SMALL
+ * is returned. After successful decompression, <uncompressed_length>
+ * contains the true length of the decompressed output.
+ *
+ * Example:
+ * size_t output_length;
+ * if (snappy_uncompressed_length(input, input_length, &output_length)
+ * != SNAPPY_OK) {
+ * ... fail ...
+ * }
+ * char* output = (char*)malloc(output_length);
+ * if (snappy_uncompress(input, input_length, output, &output_length)
+ * == SNAPPY_OK) {
+ * ... Process(output, output_length) ...
+ * }
+ * free(output);
+ */
+snappy_status snappy_uncompress(const char* compressed,
+ size_t compressed_length,
+ char* uncompressed,
+ size_t* uncompressed_length);
+
+/*
+ * Returns the maximal size of the compressed representation of
+ * input data that is "source_length" bytes in length.
+ */
+size_t snappy_max_compressed_length(size_t source_length);
+
+/*
+ * REQUIRES: "compressed[]" was produced by snappy_compress()
+ * Returns SNAPPY_OK and stores the length of the uncompressed data in
+ * *result normally. Returns SNAPPY_INVALID_INPUT on parsing error.
+ * This operation takes O(1) time.
+ */
+snappy_status snappy_uncompressed_length(const char* compressed,
+ size_t compressed_length,
+ size_t* result);
+
+/*
+ * Check if the contents of "compressed[]" can be uncompressed successfully.
+ * Does not return the uncompressed data; if so, returns SNAPPY_OK,
+ * or if not, returns SNAPPY_INVALID_INPUT.
+ * Takes time proportional to compressed_length, but is usually at least a
+ * factor of four faster than actual decompression.
+ */
+snappy_status snappy_validate_compressed_buffer(const char* compressed,
+ size_t compressed_length);
+
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
+#endif /* UTIL_SNAPPY_OPENSOURCE_SNAPPY_C_H_ */
\ No newline at end of file