You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2014/07/17 19:45:01 UTC
svn commit: r1611413 [12/18] - in
/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client:
./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nati...
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "Checksum.h"
+#include "Streams.h"
+
+namespace NativeTask {
+
+/////////////////////////////////////////////////////////////
+
+void InputStream::seek(uint64_t position) {
+ THROW_EXCEPTION(UnsupportException, "seek not support");
+}
+
+uint64_t InputStream::tell() {
+ THROW_EXCEPTION(UnsupportException, "tell not support");
+}
+
+int32_t InputStream::readFully(void * buff, uint32_t length) {
+ int32_t ret = 0;
+ while (length > 0) {
+ int32_t rd = read(buff, length);
+ if (rd <= 0) {
+ return ret > 0 ? ret : -1;
+ }
+ ret += rd;
+ buff = ((char *)buff) + rd;
+ length -= rd;
+ }
+ return ret;
+}
+
+void InputStream::readAllTo(OutputStream & out, uint32_t bufferHint) {
+ char * buffer = new char[bufferHint];
+ while (true) {
+ int32_t rd = read(buffer, bufferHint);
+ if (rd <= 0) {
+ break;
+ }
+ out.write(buffer, rd);
+ }
+ delete buffer;
+}
+
+/////////////////////////////////////////////////////////////
+
+uint64_t OutputStream::tell() {
+ THROW_EXCEPTION(UnsupportException, "tell not support");
+}
+
+///////////////////////////////////////////////////////////
+
+ChecksumInputStream::ChecksumInputStream(InputStream * stream, ChecksumType type)
+ : FilterInputStream(stream), _type(type), _limit(-1) {
+ resetChecksum();
+}
+
+void ChecksumInputStream::resetChecksum() {
+ _checksum = Checksum::init(_type);
+}
+
+uint32_t ChecksumInputStream::getChecksum() {
+ return Checksum::getValue(_type, _checksum);
+}
+
+int32_t ChecksumInputStream::read(void * buff, uint32_t length) {
+ if (_limit < 0) {
+ int32_t ret = _stream->read(buff, length);
+ if (ret > 0) {
+ Checksum::update(_type, _checksum, buff, ret);
+ }
+ return ret;
+ } else if (_limit == 0) {
+ return -1;
+ } else {
+ int64_t rd = _limit < length ? _limit : length;
+ int32_t ret = _stream->read(buff, rd);
+ if (ret > 0) {
+ _limit -= ret;
+ Checksum::update(_type, _checksum, buff, ret);
+ }
+ return ret;
+ }
+}
+
+///////////////////////////////////////////////////////////
+
+ChecksumOutputStream::ChecksumOutputStream(OutputStream * stream, ChecksumType type)
+ : FilterOutputStream(stream), _type(type) {
+ resetChecksum();
+}
+
+void ChecksumOutputStream::resetChecksum() {
+ _checksum = Checksum::init(_type);
+}
+
+uint32_t ChecksumOutputStream::getChecksum() {
+ return Checksum::getValue(_type, _checksum);
+}
+
+void ChecksumOutputStream::write(const void * buff, uint32_t length) {
+ Checksum::update(_type, _checksum, buff, length);
+ _stream->write(buff, length);
+}
+
+} // namespace NativeTask
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef STREAMS_H_
+#define STREAMS_H_
+
+#include "util/Checksum.h"
+
+namespace NativeTask {
+
+class OutputStream;
+
+class InputStream {
+public:
+ InputStream() {
+ }
+
+ virtual ~InputStream() {
+ }
+
+ virtual void seek(uint64_t position);
+
+ virtual uint64_t tell();
+
+ virtual int32_t read(void * buff, uint32_t length) {
+ return -1;
+ }
+
+ virtual void close() {
+ }
+
+ virtual int32_t readFully(void * buff, uint32_t length);
+
+ void readAllTo(OutputStream & out, uint32_t bufferHint = 1024 * 4);
+};
+
+class OutputStream {
+public:
+ OutputStream() {
+ }
+
+ virtual ~OutputStream() {
+ }
+
+ virtual uint64_t tell();
+
+ virtual void write(const void * buff, uint32_t length) {
+ }
+
+ virtual void flush() {
+ }
+
+ virtual void close() {
+ }
+};
+
+class FilterInputStream : public InputStream {
+protected:
+ InputStream * _stream;
+public:
+ FilterInputStream(InputStream * stream)
+ : _stream(stream) {
+ }
+
+ virtual ~FilterInputStream() {
+ }
+
+ void setStream(InputStream * stream) {
+ _stream = stream;
+ }
+
+ InputStream * getStream() {
+ return _stream;
+ }
+
+ virtual void seek(uint64_t position) {
+ _stream->seek(position);
+ }
+
+ virtual uint64_t tell() {
+ return _stream->tell();
+ }
+
+ virtual int32_t read(void * buff, uint32_t length) {
+ return _stream->read(buff, length);
+ }
+};
+
+class FilterOutputStream : public OutputStream {
+protected:
+ OutputStream * _stream;
+public:
+ FilterOutputStream(OutputStream * stream)
+ : _stream(stream) {
+ }
+
+ virtual ~FilterOutputStream() {
+ }
+
+ void setStream(OutputStream * stream) {
+ _stream = stream;
+ }
+
+ OutputStream * getStream() {
+ return _stream;
+ }
+
+ virtual uint64_t tell() {
+ return _stream->tell();
+ }
+
+ virtual void write(const void * buff, uint32_t length) {
+ _stream->write(buff, length);
+ }
+
+ virtual void flush() {
+ _stream->flush();
+ }
+
+ virtual void close() {
+ flush();
+ }
+};
+
+class LimitInputStream : public FilterInputStream {
+protected:
+ int64_t _limit;
+public:
+ LimitInputStream(InputStream * stream, int64_t limit)
+ : FilterInputStream(stream), _limit(limit) {
+ }
+
+ virtual ~LimitInputStream() {
+ }
+
+ int64_t getLimit() {
+ return _limit;
+ }
+
+ void setLimit(int64_t limit) {
+ _limit = limit;
+ }
+
+ virtual int32_t read(void * buff, uint32_t length) {
+ if (_limit < 0) {
+ return _stream->read(buff, length);
+ } else if (_limit == 0) {
+ return -1;
+ } else {
+ int64_t rd = _limit < length ? _limit : length;
+ int32_t ret = _stream->read(buff, rd);
+ if (ret > 0) {
+ _limit -= ret;
+ }
+ return ret;
+ }
+ }
+};
+
+class ChecksumInputStream : public FilterInputStream {
+protected:
+ ChecksumType _type;
+ uint32_t _checksum;
+ int64_t _limit;
+public:
+ ChecksumInputStream(InputStream * stream, ChecksumType type);
+
+ virtual ~ChecksumInputStream() {
+ }
+
+ int64_t getLimit() {
+ return _limit;
+ }
+
+ void setLimit(int64_t limit) {
+ _limit = limit;
+ }
+
+ void resetChecksum();
+
+ uint32_t getChecksum();
+
+ virtual int32_t read(void * buff, uint32_t length);
+};
+
+class ChecksumOutputStream : public FilterOutputStream {
+protected:
+ ChecksumType _type;
+ uint32_t _checksum;
+public:
+ ChecksumOutputStream(OutputStream * stream, ChecksumType type);
+
+ virtual ~ChecksumOutputStream() {
+ }
+
+ void resetChecksum();
+
+ uint32_t getChecksum();
+
+ virtual void write(const void * buff, uint32_t length);
+
+};
+
+} // namespace NativeTask
+
+#endif /* STREAMS_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TaskCounters.h"
+
+namespace NativeTask {
+
+#define DEFINE_COUNTER(name) const char * TaskCounters::name = #name;
+
+const char * TaskCounters::TASK_COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter";
+
+DEFINE_COUNTER(MAP_INPUT_RECORDS)
+DEFINE_COUNTER(MAP_OUTPUT_RECORDS)
+DEFINE_COUNTER(MAP_SKIPPED_RECORDS)
+DEFINE_COUNTER(MAP_INPUT_BYTES)
+DEFINE_COUNTER(MAP_OUTPUT_BYTES)
+DEFINE_COUNTER(MAP_OUTPUT_MATERIALIZED_BYTES)
+DEFINE_COUNTER(COMBINE_INPUT_RECORDS)
+DEFINE_COUNTER(COMBINE_OUTPUT_RECORDS)
+DEFINE_COUNTER(REDUCE_INPUT_GROUPS)
+DEFINE_COUNTER(REDUCE_SHUFFLE_BYTES)
+DEFINE_COUNTER(REDUCE_INPUT_RECORDS)
+DEFINE_COUNTER(REDUCE_OUTPUT_RECORDS)
+DEFINE_COUNTER(REDUCE_SKIPPED_GROUPS)
+DEFINE_COUNTER(REDUCE_SKIPPED_RECORDS)
+DEFINE_COUNTER(SPILLED_RECORDS)
+
+const char * TaskCounters::FILESYSTEM_COUNTER_GROUP = "FileSystemCounters";
+
+DEFINE_COUNTER(FILE_BYTES_READ)
+DEFINE_COUNTER(FILE_BYTES_WRITTEN)
+;
+
+} // namespace NativeTask
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef TASKCOUNTERS_H_
+#define TASKCOUNTERS_H_
+
+namespace NativeTask {
+
+class TaskCounters {
+public:
+ static const char * TASK_COUNTER_GROUP;
+
+ static const char * MAP_INPUT_RECORDS;
+ static const char * MAP_OUTPUT_RECORDS;
+ static const char * MAP_SKIPPED_RECORDS;
+ static const char * MAP_INPUT_BYTES;
+ static const char * MAP_OUTPUT_BYTES;
+ static const char * MAP_OUTPUT_MATERIALIZED_BYTES;
+ static const char * COMBINE_INPUT_RECORDS;
+ static const char * COMBINE_OUTPUT_RECORDS;
+ static const char * REDUCE_INPUT_GROUPS;
+ static const char * REDUCE_SHUFFLE_BYTES;
+ static const char * REDUCE_INPUT_RECORDS;
+ static const char * REDUCE_OUTPUT_RECORDS;
+ static const char * REDUCE_SKIPPED_GROUPS;
+ static const char * REDUCE_SKIPPED_RECORDS;
+ static const char * SPILLED_RECORDS;
+
+ static const char * FILESYSTEM_COUNTER_GROUP;
+
+ static const char * FILE_BYTES_READ;
+ static const char * FILE_BYTES_WRITTEN;
+};
+
+} // namespace NativeTask
+
+#endif /* TASKCOUNTERS_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TrackingCollector.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TrackingCollector.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TrackingCollector.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TrackingCollector.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef TRACKING_COLLECTOR_H
+#define TRACKING_COLLECTOR_H
+
+#include <stdint.h>
+#include <string>
+
+namespace NativeTask {
+
+class TrackingCollector : public Collector {
+protected:
+ Collector * _collector;
+ Counter * _counter;
+public:
+ TrackingCollector(Collector * collector, Counter * counter)
+ : _collector(collector), _counter(counter) {
+ }
+
+ virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen) {
+ _counter->increase();
+ _collector->collect(key, keyLen, value, valueLen);
+ }
+
+ virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen,
+ int32_t partition) {
+ _counter->increase();
+ _collector->collect(key, keyLen, value, valueLen, partition);
+ }
+};
+
+} //namespace NativeTask
+
+#endif //TRACKING_COLLECTOR_H
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/commons.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/commons.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/commons.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/commons.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMONS_H_
+#define COMMONS_H_
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include <stdio.h>
+#include <memory.h>
+#include <fcntl.h>
+
+#include <limits>
+#include <string>
+#include <vector>
+#include <list>
+#include <set>
+#include <map>
+#include <algorithm>
+
+#include "primitives.h"
+#include "Log.h"
+#include "NativeTask.h"
+
+#include "Constants.h"
+
+#include "Iterator.h"
+#include "TrackingCollector.h"
+
+#endif /* COMMONS_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/StringUtil.h"
+#include "util/SyncUtils.h"
+#include "jniutils.h"
+
+using namespace NativeTask;
+
+JavaVM * JNU_GetJVM(void) {
+ static JavaVM * gJVM = NULL;
+ static Lock GJVMLock;
+ if (gJVM != NULL) {
+ return gJVM;
+ }
+ {
+ ScopeLock<Lock> autolock(GJVMLock);
+ if (gJVM == NULL) {
+ jint rv = 0;
+ jint noVMs = 0;
+ rv = JNI_GetCreatedJavaVMs(&gJVM, 1, &noVMs);
+ if (rv != 0) {
+ THROW_EXCEPTION(NativeTask::HadoopException, "JNI_GetCreatedJavaVMs failed");
+ }
+ if (noVMs == 0) {
+ char *hadoopClassPath = getenv("CLASSPATH");
+ if (hadoopClassPath == NULL) {
+ THROW_EXCEPTION(NativeTask::HadoopException, "Environment variable CLASSPATH not set!");
+ return NULL;
+ }
+ const char *hadoopClassPathVMArg = "-Djava.class.path=";
+ size_t optHadoopClassPathLen = strlen(hadoopClassPath) + strlen(hadoopClassPathVMArg) + 1;
+ char *optHadoopClassPath = (char*)malloc(sizeof(char) * optHadoopClassPathLen);
+ snprintf(optHadoopClassPath, optHadoopClassPathLen, "%s%s", hadoopClassPathVMArg,
+ hadoopClassPath);
+ int noArgs = 1;
+ JavaVMOption options[noArgs];
+ options[0].optionString = optHadoopClassPath;
+
+ //Create the VM
+ JavaVMInitArgs vm_args;
+ vm_args.version = JNI_VERSION_1_6;
+ vm_args.options = options;
+ vm_args.nOptions = noArgs;
+ vm_args.ignoreUnrecognized = 1;
+ JNIEnv * jenv;
+ rv = JNI_CreateJavaVM(&gJVM, (void**)&jenv, &vm_args);
+ if (rv != 0) {
+ THROW_EXCEPTION(NativeTask::HadoopException, "JNI_CreateJavaVM failed");
+ return NULL;
+ }
+ free(optHadoopClassPath);
+ }
+ }
+ }
+ return gJVM;
+}
+
+JNIEnv* JNU_GetJNIEnv(void) {
+ JNIEnv * env;
+ jint rv = JNU_GetJVM()->AttachCurrentThread((void **)&env, NULL);
+ if (rv != 0) {
+ THROW_EXCEPTION(NativeTask::HadoopException, "Call to AttachCurrentThread failed");
+ }
+ return env;
+}
+
+void JNU_AttachCurrentThread() {
+ JNU_GetJNIEnv();
+}
+
+void JNU_DetachCurrentThread() {
+ jint rv = JNU_GetJVM()->DetachCurrentThread();
+ if (rv != 0) {
+ THROW_EXCEPTION(NativeTask::HadoopException, "Call to DetachCurrentThread failed");
+ }
+}
+
+void JNU_ThrowByName(JNIEnv *jenv, const char *name, const char *msg) {
+ jclass cls = jenv->FindClass(name);
+ if (cls != NULL) {
+ jenv->ThrowNew(cls, msg);
+ }
+ jenv->DeleteLocalRef(cls);
+}
+
+std::string JNU_ByteArrayToString(JNIEnv * jenv, jbyteArray src) {
+ if (NULL != src) {
+ jsize len = jenv->GetArrayLength(src);
+ std::string ret(len, '\0');
+ jenv->GetByteArrayRegion(src, 0, len, (jbyte*)ret.data());
+ return ret;
+ }
+ return std::string();
+}
\ No newline at end of file
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef JNIUTILS_H_
+#define JNIUTILS_H_
+
+#include <string>
+#include <jni.h>
+
+/**
+ * Get current JavaVM, if none then try to create one.
+ */
+JavaVM * JNU_GetJVM(void);
+
+/**
+ * Get JNIEnv for current thread.
+ */
+JNIEnv* JNU_GetJNIEnv(void);
+
+/**
+ * Attach currentThread, same effect as JNU_GetJNIEnv.
+ */
+void JNU_AttachCurrentThread();
+
+/**
+ * Detach current thread, call it if current thread
+ * is created in native side and have called
+ * JNU_AttachCurrentThread before
+ */
+void JNU_DetachCurrentThread();
+
+/**
+ * Throw a java exception.
+ */
+void JNU_ThrowByName(JNIEnv *jenv, const char *name, const char *msg);
+
+/**
+ * Convert a java byte array to c++ std::string
+ */
+std::string JNU_ByteArrayToString(JNIEnv * jenv, jbyteArray src);
+
+#endif /* JNIUTILS_H_ */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,58 @@
+/* DO NOT EDIT THIS FILE - it is machine generated */
+#include <jni.h>
+/* Header for class org_apache_hadoop_mapred_nativetask_NativeRuntime */
+
+#ifndef _Included_org_apache_hadoop_mapred_nativetask_NativeRuntime
+#define _Included_org_apache_hadoop_mapred_nativetask_NativeRuntime
+#ifdef __cplusplus
+extern "C" {
+#endif
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNIRelease
+ * Signature: ()V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRelease(JNIEnv *,
+ jclass);
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNIConfigure
+ * Signature: ([[B)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIConfigure(
+ JNIEnv *, jclass, jobjectArray);
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNICreateNativeObject
+ * Signature: ([B)J
+ */JNIEXPORT jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateNativeObject(
+ JNIEnv *, jclass, jbyteArray);
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNICreateDefaultNativeObject
+ * Signature: ([B)J
+ */JNIEXPORT jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateDefaultNativeObject(
+ JNIEnv *, jclass, jbyteArray);
+
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNIReleaseNativeObject
+ * Signature: (J)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIReleaseNativeObject(
+ JNIEnv *, jclass, jlong);
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNIRegisterModule
+ * Signature: ([B[B)I
+ */JNIEXPORT jint JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRegisterModule(
+ JNIEnv *, jclass, jbyteArray, jbyteArray);
+/*
+ * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method: JNIUpdateStatus
+ * Signature: ()[B
+ */JNIEXPORT jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIUpdateStatus(
+ JNIEnv *, jclass);
+
+#ifdef __cplusplus
+}
+#endif
+#endif
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/primitives.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/primitives.h?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/primitives.h (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/primitives.h Thu Jul 17 17:44:55 2014
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * High performance primitive functions
+ *
+ **/
+
+#ifndef PRIMITIVES_H_
+#define PRIMITIVES_H_
+
+#include <stddef.h>
+#include <stdint.h>
+#include <assert.h>
+#include <string>
+
+#ifdef __GNUC__
+#define likely(x) __builtin_expect((x),1)
+#define unlikely(x) __builtin_expect((x),0)
+#else
+#define likely(x) (x)
+#define unlikely(x) (x)
+#endif
+
+//#define SIMPLE_MEMCPY
+
+#if !defined(SIMPLE_MEMCPY)
+#define simple_memcpy memcpy
+#define simple_memcpy2 memcpy
+#else
+
+/**
+ * This memcpy assumes src & dest are not overlapped,
+ * and len are normally very small(<64)
+ * This function is primarily optimized for x86-64 processors,
+ * on which unaligned 64-bit loads and stores are cheap
+ *
+ * @param dest: dest buffer
+ * @param src: src buffer
+ * @param len: src buffer size, must be >0
+ */
+inline void simple_memcpy(void * dest, const void * src, size_t len) {
+ const uint8_t * src8 = (const uint8_t*)src;
+ uint8_t * dest8 = (uint8_t*)dest;
+ switch (len) {
+ case 0:
+ return;
+ case 1:
+ dest8[0]=src8[0];
+ return;
+ case 2:
+ *(uint16_t*)dest8=*(const uint16_t*)src8;
+ return;
+ case 3:
+ *(uint16_t*)dest8 = *(const uint16_t*)src8;
+ dest8[2]=src8[2];
+ return;
+ case 4:
+ *(uint32_t*)dest8 = *(const uint32_t*)src8;
+ return;
+ }
+ if (len<8) {
+ *(uint32_t*)dest8 = *(const uint32_t*)src8;
+ *(uint32_t*)(dest8+len-4) = *(const uint32_t*)(src8+len-4);
+ return;
+ }
+ if (len<128) {
+ int64_t cur = (int64_t)len - 8;
+ while (cur>0) {
+ *(uint64_t*)(dest8+cur) = *(const uint64_t*)(src8+cur);
+ cur -= 8;
+ }
+ *(uint64_t*)(dest8) = *(const uint64_t*)(src8);
+ return;
+ }
+ ::memcpy(dest, src, len);
+}
+
+#endif
+
+/**
+ * little-endian to big-endian or vice versa
+ */
+inline uint32_t bswap(uint32_t val) {
+ __asm__("bswap %0" : "=r" (val) : "0" (val));
+ return val;
+}
+
+inline uint64_t bswap64(uint64_t val) {
+#ifdef __X64
+ __asm__("bswapq %0" : "=r" (val) : "0" (val));
+#else
+
+ uint64_t lower = val & 0xffffffffU;
+ uint32_t higher = (val >> 32) & 0xffffffffU;
+
+ lower = bswap(lower);
+ higher = bswap(higher);
+
+ return (lower << 32) + higher;
+
+#endif
+ return val;
+}
+
+/**
+ * Fast memcmp
+ */
+inline int64_t fmemcmp(const char * src, const char * dest, uint32_t len) {
+
+#ifdef BUILDIN_MEMCMP
+ return memcmp(src, dest, len);
+#else
+
+ const uint8_t * src8 = (const uint8_t*)src;
+ const uint8_t * dest8 = (const uint8_t*)dest;
+ switch (len) {
+ case 0:
+ return 0;
+ case 1:
+ return (int64_t)src8[0] - (int64_t)dest8[0];
+ case 2: {
+ int64_t ret = ((int64_t)src8[0] - (int64_t)dest8[0]);
+ if (ret)
+ return ret;
+ return ((int64_t)src8[1] - (int64_t)dest8[1]);
+ }
+ case 3: {
+ int64_t ret = ((int64_t)src8[0] - (int64_t)dest8[0]);
+ if (ret)
+ return ret;
+ ret = ((int64_t)src8[1] - (int64_t)dest8[1]);
+ if (ret)
+ return ret;
+ return ((int64_t)src8[2] - (int64_t)dest8[2]);
+ }
+ case 4: {
+ return (int64_t)bswap(*(uint32_t*)src) - (int64_t)bswap(*(uint32_t*)dest);
+ }
+ }
+ if (len < 8) {
+ int64_t ret = ((int64_t)bswap(*(uint32_t*)src) - (int64_t)bswap(*(uint32_t*)dest));
+ if (ret) {
+ return ret;
+ }
+ return ((int64_t)bswap(*(uint32_t*)(src + len - 4))
+ - (int64_t)bswap(*(uint32_t*)(dest + len - 4)));
+ }
+ uint32_t cur = 0;
+ uint32_t end = len & (0xffffffffU << 3);
+ while (cur < end) {
+ uint64_t l = *(uint64_t*)(src8 + cur);
+ uint64_t r = *(uint64_t*)(dest8 + cur);
+ if (l != r) {
+ l = bswap64(l);
+ r = bswap64(r);
+ return l > r ? 1 : -1;
+ }
+ cur += 8;
+ }
+ uint64_t l = *(uint64_t*)(src8 + len - 8);
+ uint64_t r = *(uint64_t*)(dest8 + len - 8);
+ if (l != r) {
+ l = bswap64(l);
+ r = bswap64(r);
+ return l > r ? 1 : -1;
+ }
+ return 0;
+#endif
+}
+
+inline int64_t fmemcmp(const char * src, const char * dest, uint32_t srcLen, uint32_t destLen) {
+ uint32_t minlen = srcLen < destLen ? srcLen : destLen;
+ int64_t ret = fmemcmp(src, dest, minlen);
+ if (ret) {
+ return ret;
+ }
+ return (int64_t)srcLen - (int64_t)destLen;
+}
+
+/**
+ * Fast memory equal
+ */
+inline bool fmemeq(const char * src, const char * dest, uint32_t len) {
+#ifdef BUILDIN_MEMCMP
+ return 0 == memcmp(src, dest, len);
+#else
+
+ const uint8_t * src8 = (const uint8_t*)src;
+ const uint8_t * dest8 = (const uint8_t*)dest;
+ switch (len) {
+ case 0:
+ return true;
+ case 1:
+ return src8[0] == dest8[0];
+ case 2:
+ return *(uint16_t*)src8 == *(uint16_t*)dest8;
+ case 3:
+ return (*(uint16_t*)src8 == *(uint16_t*)dest8) && (src8[2] == dest8[2]);
+ case 4:
+ return *(uint32_t*)src8 == *(uint32_t*)dest8;
+ }
+ if (len < 8) {
+ return (*(uint32_t*)src8 == *(uint32_t*)dest8)
+ && (*(uint32_t*)(src8 + len - 4) == *(uint32_t*)(dest8 + len - 4));
+ }
+ uint32_t cur = 0;
+ uint32_t end = len & (0xffffffff << 3);
+ while (cur < end) {
+ uint64_t l = *(uint64_t*)(src8 + cur);
+ uint64_t r = *(uint64_t*)(dest8 + cur);
+ if (l != r) {
+ return false;
+ }
+ cur += 8;
+ }
+ uint64_t l = *(uint64_t*)(src8 + len - 8);
+ uint64_t r = *(uint64_t*)(dest8 + len - 8);
+ if (l != r) {
+ return false;
+ }
+ return true;
+#endif
+}
+
+inline bool fmemeq(const char * src, uint32_t srcLen, const char * dest, uint32_t destLen) {
+ if (srcLen != destLen) {
+ return false;
+ }
+ return fmemeq(src, dest, std::min(srcLen, destLen));
+}
+
+/**
+ * Fast memory equal, reverse order
+ */
+inline bool frmemeq(const char * src, const char * dest, uint32_t len) {
+ const uint8_t * src8 = (const uint8_t*)src;
+ const uint8_t * dest8 = (const uint8_t*)dest;
+ switch (len) {
+ case 0:
+ return true;
+ case 1:
+ return src8[0] == dest8[0];
+ case 2:
+ return *(uint16_t*)src8 == *(uint16_t*)dest8;
+ case 3:
+ return (src8[2] == dest8[2]) && (*(uint16_t*)src8 == *(uint16_t*)dest8);
+ case 4:
+ return *(uint32_t*)src8 == *(uint32_t*)dest8;
+ }
+ if (len < 8) {
+ return (*(uint32_t*)(src8 + len - 4) == *(uint32_t*)(dest8 + len - 4))
+ && (*(uint32_t*)src8 == *(uint32_t*)dest8);
+ }
+ int32_t cur = (int32_t)len - 8;
+ while (cur > 0) {
+ if (*(uint64_t*)(src8 + cur) != *(uint64_t*)(dest8 + cur)) {
+ return false;
+ }
+ cur -= 8;
+ }
+ return *(uint64_t*)(src8) == *(uint64_t*)(dest8);
+}
+
+inline bool frmemeq(const char * src, const char * dest, uint32_t srcLen, uint32_t destLen) {
+ if (srcLen != destLen) {
+ return false;
+ }
+ return frmemeq(src, dest, std::min(srcLen, destLen));
+}
+
+#endif /* PRIMITIVES_H_ */