You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by mi...@apache.org on 2013/11/23 09:41:10 UTC
svn commit: r1544761 [2/3] - in /hama/trunk: ./ c++/ c++/src/
c++/src/main/native/examples/ c++/src/main/native/examples/conf/
c++/src/main/native/examples/impl/ c++/src/main/native/examples/input/
c++/src/main/native/pipes/api/hama/ c++/src/main/nativ...
Added: hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/impl/Pipes.cc?rev=1544761&view=auto
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc (added)
+++ hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc Sat Nov 23 08:41:09 2013
@@ -0,0 +1,1356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/********************************************/
+/*********** BinaryUpwardProtocol ***********/
+/********************************************/
+class BinaryUpwardProtocol: public UpwardProtocol<BinaryUpwardProtocol> {
+private:
+ FileOutStream* out_stream_;
+public:
+ BinaryUpwardProtocol(FILE* out_stream) {
+ out_stream_ = new FileOutStream();
+ HADOOP_ASSERT(out_stream_->open(out_stream), "problem opening stream");
+ }
+
+ /* local function */
+ void sendCommand(int32_t cmd, bool flush) {
+ serializeInt(cmd, *out_stream_);
+ if (flush) {
+ out_stream_->flush();
+ }
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s\n",
+ messageTypeNames[cmd]);
+ }
+ }
+
+ template<class T>
+ void sendCommand(int32_t cmd, T value) {
+ sendCommand(cmd, false);
+ // Write out generic value
+ serialize<T>(value, *out_stream_);
+ out_stream_->flush();
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: '%s'\n",
+ messageTypeNames[cmd], toString<T>(value).c_str());
+ }
+ }
+
+ template<class T>
+ void sendCommand(int32_t cmd, const T values[], int size) {
+ sendCommand(cmd, false);
+ // Write out generic values
+ for (int i=0; i<size; i++) {
+ serialize<T>(values[i], *out_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: '%s'\n",
+ messageTypeNames[cmd], i+1, toString<T>(values[i]).c_str());
+ }
+ }
+ out_stream_->flush();
+ }
+
+ template<class T1, class T2>
+ void sendCommand(int32_t cmd, T1 value1, T2 value2) {
+ sendCommand(cmd, false);
+ // Write out generic value1
+ serialize<T1>(value1, *out_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param1: '%s'\n",
+ messageTypeNames[cmd], toString<T1>(value1).c_str());
+ }
+ // Write out generic value2
+ serialize<T2>(value2, *out_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param2: '%s'\n",
+ messageTypeNames[cmd], toString<T2>(value2).c_str());
+ }
+ out_stream_->flush();
+ }
+
+ template<class T1, class T2>
+ void sendCommand(int32_t cmd, T1 value, const T2 values[], int size) {
+ sendCommand(cmd, false);
+ // Write out generic value
+ serialize<T1>(value, *out_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: '%s'\n",
+ messageTypeNames[cmd], 0, toString<T1>(value).c_str());
+ }
+ // Write out generic values
+ for (int i=0; i<size; i++) {
+ serialize<T2>(values[i], *out_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: '%s'\n",
+ messageTypeNames[cmd], i+1, toString<T2>(value).c_str());
+ }
+ }
+ out_stream_->flush();
+ }
+
+ virtual void sendCommand(int32_t cmd) {
+ sendCommand(cmd, true);
+ }
+
+ /*
+ virtual void registerCounter(int id, const string& group,
+ const string& name) {
+ serializeInt(REGISTER_COUNTER, *stream);
+ serializeInt(id, *stream);
+ serializeString(group, *stream);
+ serializeString(name, *stream);
+ }
+
+ virtual void incrementCounter(const TaskContext::Counter* counter,
+ uint64_t amount) {
+ serializeInt(INCREMENT_COUNTER, *stream);
+ serializeInt(counter->getId(), *stream);
+ serializeLong(amount, *stream);
+ }
+ */
+
+ virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
+ serializeInt(INCREMENT_COUNTER, *out_stream_);
+ serializeString(group, *out_stream_);
+ serializeString(name, *out_stream_);
+ serializeLong(amount, *out_stream_);
+ out_stream_->flush();
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent incrementCounter\n");
+ }
+ }
+
+ ~BinaryUpwardProtocol() {
+ delete out_stream_;
+ }
+};
+
+/********************************************/
+/************** BinaryProtocol **************/
+/********************************************/
+template<class K1, class V1>
+class BinaryProtocol: public Protocol< BinaryProtocol<K1,V1> > {
+private:
+ FileInStream* in_stream_;
+ DownwardProtocol<K1,V1>* handler_;
+ BinaryUpwardProtocol* uplink_;
+
+public:
+ BinaryProtocol(FILE* in_stream, DownwardProtocol<K1,V1>* handler, FILE* uplink) {
+ in_stream_ = new FileInStream();
+ in_stream_->open(in_stream);
+ uplink_ = new BinaryUpwardProtocol(uplink);
+ handler_ = handler;
+ }
+
+ UpwardProtocol<BinaryUpwardProtocol>* getUplink() {
+ return uplink_;
+ }
+
+ /**
+ * Wait for next event, but don't expect a response for
+ * a previously sent command
+ */
+ void nextEvent() {
+ // read command
+ int32_t cmd;
+ cmd = deserializeInt(*in_stream_);
+
+ switch (cmd) {
+
+ case START_MESSAGE: {
+ int32_t protocol_version;
+ protocol_version = deserialize<int32_t>(*in_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE protocol_version: %d\n",
+ protocol_version);
+ }
+ handler_->start(protocol_version);
+ break;
+ }
+ // setup BSP Job Configuration
+ case SET_BSPJOB_CONF: {
+ int32_t entries;
+ entries = deserialize<int32_t>(*in_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n",
+ entries);
+ }
+ vector<string> properties(entries*2);
+ for(int i=0; i < entries*2; ++i) {
+ string item;
+ item = deserialize<string>(*in_stream_);
+ properties.push_back(item);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n",
+ item.c_str());
+ }
+ }
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n",
+ entries);
+ }
+ handler_->setBSPJob(properties);
+ break;
+ }
+ case SET_INPUT_TYPES: {
+ string key_type;
+ string value_type;
+ key_type = deserialize<string>(*in_stream_);
+ value_type = deserialize<string>(*in_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n",
+ key_type.c_str(), value_type.c_str());
+ }
+ handler_->setInputTypes(key_type, value_type);
+ break;
+ }
+ case RUN_SETUP: {
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n");
+ }
+ int32_t piped_input;
+ int32_t piped_output;
+ piped_input = deserialize<int32_t>(*in_stream_);
+ piped_output = deserialize<int32_t>(*in_stream_);
+ handler_->runSetup(piped_input, piped_output);
+ break;
+ }
+ case RUN_BSP: {
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n");
+ }
+ int32_t piped_input;
+ int32_t piped_output;
+ piped_input = deserialize<int32_t>(*in_stream_);
+ piped_output = deserialize<int32_t>(*in_stream_);
+ handler_->runBsp(piped_input, piped_output);
+ break;
+ }
+ case RUN_CLEANUP: {
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n");
+ }
+ int32_t piped_input;
+ int32_t piped_output;
+ piped_input = deserialize<int32_t>(*in_stream_);
+ piped_output = deserialize<int32_t>(*in_stream_);
+ handler_->runCleanup(piped_input, piped_output);
+ break;
+ }
+ case PARTITION_REQUEST: {
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n");
+ }
+
+ K1 partion_key;
+ V1 partion_value;
+ int32_t num_tasks;
+
+ partion_key = deserialize<K1>(*in_stream_);
+ partion_value = deserialize<V1>(*in_stream_);
+ num_tasks = deserialize<int32_t>(*in_stream_);
+
+ handler_->runPartition(partion_key, partion_value, num_tasks);
+ break;
+ }
+ case CLOSE: {
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n");
+ }
+ handler_->close();
+ break;
+ }
+ case ABORT: {
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n");
+ }
+ handler_->abort();
+ break;
+ }
+ default: {
+ HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",
+ cmd);
+ }
+ }
+ }
+
+ /**
+ * Wait for next event, which should be a response for
+ * a previously sent command (expected_response_cmd)
+ * and return the generic result
+ */
+ template<class T>
+ T getResult(int32_t expected_response_cmd) {
+
+ T result = T();
+
+ // read response command
+ int32_t cmd;
+ cmd = deserializeInt(*in_stream_);
+
+ // check if response is expected
+ if (expected_response_cmd == cmd) {
+
+ switch (cmd) {
+
+ case GET_MSG_COUNT: {
+ T msg_count;
+ msg_count = deserialize<T>(*in_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG_COUNT msg_count: '%s'\n",
+ toString<T>(msg_count).c_str());
+ }
+ return msg_count;
+ }
+ case GET_MSG: {
+ T msg;
+ msg = deserialize<T>(*in_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG msg: '%s'\n",
+ toString<T>(msg).c_str());
+ }
+ return msg;
+ }
+ case GET_PEERNAME: {
+ T peername;
+ peername = deserialize<T>(*in_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEERNAME peername: %s\n",
+ toString<T>(peername).c_str());
+ }
+ return peername;
+ }
+ case GET_PEER_INDEX: {
+ T peer_index = deserialize<T>(*in_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_INDEX peer_index: '%s'\n",
+ toString<T>(peer_index).c_str());
+ }
+ return peer_index;
+ }
+ case GET_PEER_COUNT: {
+ T peer_count = deserialize<T>(*in_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_COUNT peer_count: '%s'\n",
+ toString<T>(peer_count).c_str());
+ }
+ return peer_count;
+ }
+ case GET_SUPERSTEP_COUNT: {
+ T superstep_count = deserialize<T>(*in_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_SUPERSTEP_COUNT superstep_count: '%s'\n",
+ toString<T>(superstep_count).c_str());
+ }
+ return superstep_count;
+ }
+
+ case SEQFILE_OPEN: {
+ T file_id = deserialize<T>(*in_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_OPEN file_id: '%s'\n",
+ toString<T>(file_id).c_str());
+ }
+ return file_id;
+ }
+ case SEQFILE_APPEND: {
+ result = deserialize<T>(*in_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_APPEND result: '%s'\n",
+ toString<T>(result).c_str());
+ }
+ return result;
+ }
+ case SEQFILE_CLOSE: {
+ result = deserialize<T>(*in_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_CLOSE result: '%s'\n",
+ toString<T>(result).c_str());
+ }
+ return result;
+ }
+ }
+ // Not expected response
+ } else {
+
+ /*
+ case CLOSE: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n");
+ handler_->close();
+ break;
+ }
+ case ABORT: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n");
+ handler_->abort();
+ break;
+ }
+ */
+ HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent(%d) - Unknown binary command: %d\n",
+ expected_response_cmd, cmd);
+ }
+ return result;
+ }
+
+ /**
+ * Wait for next event, which should be a response for
+ * a previously sent command (expected_response_cmd)
+ * and return the generic vector result list
+ */
+ template<class T>
+ vector<T> getVectorResult(int32_t expected_response_cmd) {
+
+ vector<T> results;
+
+ // read response command
+ int32_t cmd;
+ cmd = deserializeInt(*in_stream_);
+
+ // check if response is expected
+ if (expected_response_cmd == cmd) {
+
+ switch (cmd) {
+ case GET_ALL_PEERNAME: {
+ vector<T> peernames;
+ T peername;
+ int32_t peername_count = deserialize<int32_t>(*in_stream_);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername_count: %d\n",
+ peername_count);
+ }
+ for (int i=0; i<peername_count; i++) {
+ peername = deserialize<T>(*in_stream_);
+ peernames.push_back(peername);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername: '%s'\n",
+ toString<T>(peername).c_str());
+ }
+ }
+ return peernames;
+ }
+ }
+ } else {
+ HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent(%d) - Unknown binary command: %d\n",
+ expected_response_cmd, cmd);
+ }
+ return results;
+ }
+
+ /**
+ * Wait for next event, which should be a response for
+ * a previously sent command (expected_response_cmd)
+ * and return the generic KeyValuePair or an empty one
+ * if no data is available
+ */
+ template <class K, class V>
+ KeyValuePair<K,V> getKeyValueResult(int32_t expected_response_cmd) {
+
+ KeyValuePair<K,V> key_value_pair;
+
+ // read response command
+ int32_t cmd;
+ cmd = deserializeInt(*in_stream_);
+
+ // check if response is expected or END_OF_DATA
+ if ((expected_response_cmd == cmd) || (cmd == END_OF_DATA) ) {
+
+ switch (cmd) {
+
+ case READ_KEYVALUE: {
+ K key = deserialize<K>(*in_stream_);
+ V value = deserialize<V>(*in_stream_);
+
+ if(logging) {
+ string k = toString<K>(key);
+ string v = toString<V>(value);
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got READ_KEYVALUE key: '%s' value: '%s'\n",
+ ((k.length()<10)?k.c_str():k.substr(0,9).append("...").c_str()),
+ ((v.length()<10)?v.c_str():v.substr(0,9).append("...").c_str()) );
+ }
+
+ key_value_pair = pair<K,V>(key, value);
+ return key_value_pair;
+ }
+ case SEQFILE_READNEXT: {
+ K key = deserialize<K>(*in_stream_);
+ V value = deserialize<V>(*in_stream_);
+
+ if(logging) {
+ string k = toString<K>(key);
+ string v = toString<V>(value);
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_READNEXT key: '%s' value: '%s'\n",
+ ((k.length()<10)?k.c_str():k.substr(0,9).append("...").c_str()),
+ ((v.length()<10)?v.c_str():v.substr(0,9).append("...").c_str()) );
+ }
+
+ key_value_pair = pair<K,V>(key, value);
+ return key_value_pair;
+ }
+ case END_OF_DATA: {
+ key_value_pair = KeyValuePair<K,V>(true);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got END_OF_DATA\n");
+ }
+ }
+ }
+ } else {
+ key_value_pair = KeyValuePair<K,V>(true);
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent(expected_cmd = %d) - Unknown binary command: %d\n",
+ expected_response_cmd, cmd);
+ fprintf(stderr,"ERORR: Please verfiy serialization! The key or value type could possibly not be deserialized!\n");
+ HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
+ }
+ return key_value_pair;
+ }
+
+ virtual ~BinaryProtocol() {
+ delete in_stream_;
+ delete uplink_;
+ delete handler_;
+ }
+};
+
+/********************************************/
+/************** BSPContextImpl **************/
+/********************************************/
+template<class K1, class V1, class K2, class V2, class M>
+class BSPContextImpl: public BSPContext<K1, V1, K2, V2, M>, public DownwardProtocol<K1, V1> {
+private:
+ const Factory<K1, V1, K2, V2, M>* factory_;
+ BSPJob* job_;
+ BSP<K1, V1, K2, V2, M>* bsp_;
+ Partitioner<K1, V1, K2, V2, M>* partitioner_;
+ RecordReader<K1, V1>* reader_;
+ RecordWriter<K2, V2>* writer_;
+ Protocol< BinaryProtocol<K1,V1> >* protocol_;
+ UpwardProtocol<BinaryUpwardProtocol>* uplink_;
+
+ bool done_;
+ bool has_task_;
+ pthread_mutex_t mutex_done_;
+ std::vector<int> registered_counter_ids_;
+
+ pair<string, string> inputClass_;
+ //string* inputSplit_;
+
+public:
+
+ BSPContextImpl(const Factory<K1, V1, K2, V2, M>& factory) {
+
+ factory_ = &factory;
+ job_ = NULL;
+ bsp_ = NULL;
+ partitioner_ = NULL;
+ reader_ = NULL;
+ writer_ = NULL;
+ protocol_ = NULL;
+
+ done_ = false;
+ has_task_ = false;
+ pthread_mutex_init(&mutex_done_, NULL);
+
+ //inputSplit_ = NULL;
+ }
+
+
+ /********************************************/
+ /*********** DownwardProtocol IMPL **********/
+ /********************************************/
+ virtual void start(int protocol_version) {
+ if (protocol_version != 0) {
+ throw Error("Protocol version " + toString(protocol_version) +
+ " not supported");
+ }
+ partitioner_ = factory_->createPartitioner(*this);
+ }
+
+ virtual void setBSPJob(vector<string> values) {
+ int len = values.size();
+ BSPJobImpl* result = new BSPJobImpl();
+ HADOOP_ASSERT(len % 2 == 0, "Odd length of job conf values");
+ for(int i=0; i < len; i += 2) {
+ result->set(values[i], values[i+1]);
+ }
+ job_ = result;
+ }
+
+ virtual void setInputTypes(string key_type, string value_type) {
+ inputClass_ = pair<string,string>(key_type, value_type);
+ }
+
+ /* local method */
+ void setupReaderWriter(bool piped_input, bool piped_output) {
+
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::setupReaderWriter - pipedInput: %s pipedOutput: %s\n",
+ (piped_input)?"true":"false", (piped_output)?"true":"false");
+ }
+
+ if (piped_input && reader_==NULL) {
+ reader_ = factory_->createRecordReader(*this);
+ HADOOP_ASSERT((reader_ == NULL) == piped_input,
+ piped_input ? "RecordReader defined when not needed.":
+ "RecordReader not defined");
+
+ //if (reader != NULL) {
+ // value = new string();
+ //}
+ }
+
+ if (piped_output && writer_==NULL) {
+ writer_ = factory_->createRecordWriter(*this);
+ HADOOP_ASSERT((writer_ == NULL) == piped_output,
+ piped_output ? "RecordWriter defined when not needed.":
+ "RecordWriter not defined");
+ }
+ }
+
+ virtual void runSetup(bool piped_input, bool piped_output) {
+ setupReaderWriter(piped_input, piped_output);
+
+ if (bsp_ == NULL) {
+ bsp_ = factory_->createBSP(*this);
+ }
+
+ if (bsp_ != NULL) {
+ has_task_ = true;
+ bsp_->setup(*this);
+ has_task_ = false;
+ uplink_->sendCommand(TASK_DONE);
+ }
+ }
+
+ virtual void runBsp(bool piped_input, bool piped_output) {
+ setupReaderWriter(piped_input, piped_output);
+
+ if (bsp_ == NULL) {
+ bsp_ = factory_->createBSP(*this);
+ }
+
+ if (bsp_ != NULL) {
+ has_task_ = true;
+ bsp_->bsp(*this);
+ has_task_ = false;
+ uplink_->sendCommand(TASK_DONE);
+ }
+ }
+
+ virtual void runCleanup(bool piped_input, bool piped_output) {
+ setupReaderWriter(piped_input, piped_output);
+
+ if (bsp_ != NULL) {
+ has_task_ = true;
+ bsp_->cleanup(*this);
+ has_task_ = false;
+ uplink_->sendCommand(TASK_DONE);
+ }
+ }
+
+ /********************************************/
+ /******* Partitioner *******/
+ /********************************************/
+ virtual void runPartition(const K1& key, const V1& value, int32_t num_tasks) {
+ if (partitioner_ != NULL) {
+ int part = partitioner_->partition(key, value, num_tasks);
+ uplink_->sendCommand<int32_t>(PARTITION_RESPONSE, part);
+ } else {
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n");
+ }
+ }
+ }
+
+ virtual void close() {
+ pthread_mutex_lock(&mutex_done_);
+ done_ = true;
+ has_task_ = false;
+ pthread_mutex_unlock(&mutex_done_);
+
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::close - done: %s hasTask: %s\n",
+ (done_)?"true":"false",(has_task_)?"true":"false");
+ }
+ }
+
+ virtual void abort() {
+ throw Error("Aborted by driver");
+ }
+
+ /********************************************/
+ /************** TaskContext IMPL ************/
+ /********************************************/
+
+ /**
+ * Get the BSPJob for the current task.
+ */
+ virtual const BSPJob* getBSPJob() {
+ return job_;
+ }
+
+ /**
+ * Get the current key.
+ * @return the current key or NULL if called before the first map or reduce
+ */
+ //virtual const string& getInputKey() {
+ // return key;
+ //}
+
+ /**
+ * Get the current value.
+ * @return the current value or NULL if called before the first map or
+ * reduce
+ */
+ //virtual const string& getInputValue() {
+ // return *value;
+ //}
+
+ /**
+ * Register a counter with the given group and name.
+ */
+ /*
+ virtual Counter* getCounter(const std::string& group,
+ const std::string& name) {
+ int id = registeredCounterIds.size();
+ registeredCounterIds.push_back(id);
+ uplink->registerCounter(id, group, name);
+ return new Counter(id);
+ }*/
+
+ /**
+ * Increment the value of the counter with the given amount.
+ */
+ virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
+ uplink_->incrementCounter(group, name, amount);
+ }
+
+ /********************************************/
+ /************** BSPContext IMPL *************/
+ /********************************************/
+
+ /**
+ * Access the InputSplit of the bsp.
+ */
+ //virtual const string& getInputSplit() {
+ // return *inputSplit_;
+ //}
+
+ /**
+ * Get the name of the key class of the input to this task.
+ */
+ virtual string getInputKeyClass() {
+ return inputClass_.first;
+ }
+
+ /**
+ * Get the name of the value class of the input to this task.
+ */
+ virtual string getInputValueClass() {
+ return inputClass_.second;
+ }
+
+ /**
+ * Send a data with a tag to another BSPSlave corresponding to hostname.
+ * Messages sent by this method are not guaranteed to be received in a sent
+ * order.
+ */
+ virtual void sendMessage(const string& peer_name, const M& msg) {
+ uplink_->sendCommand<string,M>(SEND_MSG, peer_name, msg);
+ }
+
+ /**
+ * @return A message from the peer's received messages queue (a FIFO).
+ */
+ virtual M getCurrentMessage() {
+ uplink_->sendCommand(GET_MSG);
+
+ M message = protocol_->template getResult<M>(GET_MSG);
+
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - result: %s\n",
+ toString<M>(message).c_str());
+ }
+ return message;
+ }
+
+ /**
+ * @return The number of messages in the peer's received messages queue.
+ */
+ virtual int getNumCurrentMessages() {
+ uplink_->sendCommand(GET_MSG_COUNT);
+
+ int result = protocol_->template getResult<int32_t>(GET_MSG_COUNT);
+
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::getNumCurrentMessages - result: %d\n",
+ result);
+ }
+ return result;
+ }
+
+ /**
+ * Barrier Synchronization.
+ *
+ * Sends all the messages in the outgoing message queues to the corresponding
+ * remote peers.
+ */
+ virtual void sync() {
+ uplink_->sendCommand(SYNC);
+ }
+
+ /**
+ * @return the name of this peer in the format "hostname:port".
+ */
+ virtual string getPeerName() {
+ // submit id=-1 to receive own peername
+ uplink_->sendCommand<int32_t>(GET_PEERNAME, -1);
+
+ string result = protocol_->template getResult<string>(GET_PEERNAME);
+
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - result: %s\n",
+ result.c_str());
+ }
+ return result;
+ }
+
+ /**
+ * @return the name of n-th peer from sorted array by name.
+ */
+ virtual string getPeerName(int index) {
+ uplink_->sendCommand<int32_t>(GET_PEERNAME, index);
+
+ string result = protocol_->template getResult<string>(GET_PEERNAME);
+
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - result: %s\n",
+ result.c_str());
+ }
+
+ return result;
+ }
+
+ /**
+ * @return the names of all the peers executing tasks from the same job
+ * (including this peer).
+ */
+ virtual vector<string> getAllPeerNames() {
+ uplink_->sendCommand(GET_ALL_PEERNAME);
+
+ vector<string> results = protocol_->template getVectorResult<string>(GET_ALL_PEERNAME);
+
+ return results;
+ }
+
+ /**
+ * @return the index of this peer from sorted array by name.
+ */
+ virtual int getPeerIndex() {
+ uplink_->sendCommand(GET_PEER_INDEX);
+
+ int result = protocol_->template getResult<int32_t>(GET_PEER_INDEX);
+
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerIndex - result: %d\n",
+ result);
+ }
+ return result;
+ }
+
+ /**
+ * @return the number of peers
+ */
+ virtual int getNumPeers() {
+ uplink_->sendCommand(GET_PEER_COUNT);
+
+ int result = protocol_->template getResult<int32_t>(GET_PEER_COUNT);
+
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::getNumPeers - result: %d\n",
+ result);
+ }
+ return result;
+ }
+
+ /**
+ * @return the count of current super-step
+ */
+ virtual long getSuperstepCount() {
+ uplink_->sendCommand(GET_SUPERSTEP_COUNT);
+
+ long result = protocol_->template getResult<int64_t>(GET_SUPERSTEP_COUNT);
+
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::getSuperstepCount - result: %ld\n",
+ result);
+ }
+ return result;
+ }
+
+ /**
+ * Clears all queues entries.
+ */
+ virtual void clear() {
+ uplink_->sendCommand(CLEAR);
+ }
+
+ /**
+ * Writes a key/value pair to the output collector
+ */
+ virtual void write(const K2& key, const V2& value) {
+ if (writer_ != NULL) {
+ writer_->emit(key, value);
+ } else {
+ uplink_->sendCommand<K2,V2>(WRITE_KEYVALUE, key, value);
+ }
+ }
+
+ /**
+ * Deserializes the next input key value into the given objects;
+ */
+ virtual bool readNext(K1& key, V1& value) {
+
+ uplink_->sendCommand(READ_KEYVALUE);
+
+ // TODO
+ // check if value is array [0, 1, 2, ...], and remove brackets
+ /*
+ int len = current_value_.length();
+ if ( (current_value_[0]=='[') &&
+ (current_value_[len-1]==']') ) {
+ value = current_value_.substr(1,len-2);
+ } else {
+ value = current_value_;
+ }
+
+ if (logging && key.empty() && value.empty()) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - Empty KeyValuePair\n");
+ }
+
+ return (!key.empty() && !value.empty());
+ */
+
+ KeyValuePair<K1,V1> key_value_pair;
+ key_value_pair = protocol_->template getKeyValueResult<K1,V1>(READ_KEYVALUE);
+
+ if (!key_value_pair.is_empty) {
+ key = key_value_pair.first;
+ value = key_value_pair.second;
+ }
+
+ if (logging && key_value_pair.is_empty) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - END_OF_DATA\n");
+ }
+
+ return (!key_value_pair.is_empty);
+ }
+
+ /**
+ * Closes the input and opens it right away, so that the file pointer is at
+ * the beginning again.
+ */
+ virtual void reopenInput() {
+ uplink_->sendCommand(REOPEN_INPUT);
+ }
+
+
+ /********************************************/
+ /******* SequenceFileConnector IMPL *******/
+ /********************************************/
+
+ /**
+ * Open SequenceFile with opion "r" or "w"
+ * @return the corresponding fileID
+ */
+ virtual int32_t sequenceFileOpen(const string& path, const string& option,
+ const string& key_type, const string& value_type) {
+ if (logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen - Path: %s\n",
+ path.c_str());
+ }
+
+ if ( (option.compare("r")==0) || (option.compare("w")==0)) {
+
+ string values[] = {path, option, key_type, value_type};
+ uplink_->sendCommand<string>(SEQFILE_OPEN, values, 4);
+
+ int result = protocol_->template getResult<int32_t>(SEQFILE_OPEN);
+
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen - result: %d\n",
+ result);
+ }
+ return result;
+ } else {
+ //Error wrong option
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen wrong option: %s!\n",
+ option.c_str());
+ return -1;
+ }
+ }
+
+ /**
+ * Close SequenceFile
+ */
+ virtual bool sequenceFileClose(int32_t file_id) {
+ uplink_->sendCommand<int32_t>(SEQFILE_CLOSE, file_id);
+
+ int result = protocol_->template getResult<int32_t>(SEQFILE_CLOSE);
+
+ if (logging && result==0) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - Nothing was closed!\n");
+ } else if (logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - File was successfully closed!\n");
+ }
+
+ return (result==1);
+ }
+
+ /**
+ * Read next key/value pair from the SequenceFile with fileID
+ * Using Curiously recurring template pattern(CTRP)
+ */
+ template<class K, class V>
+ bool sequenceFileReadNext(int32_t file_id, K& key, V& value) {
+
+ // send request
+ uplink_->sendCommand<int32_t>(SEQFILE_READNEXT, file_id);
+
+ // TODO
+ /*
+ // check if value is array [0, 1, 2, ...], and remove brackets
+ int len = current_value_.length();
+ if ( (current_value_[0]=='[') &&
+ (current_value_[len-1]==']') ) {
+ value = current_value_(1,len-2);
+ } else {
+ value = current_value_;
+ }
+ */
+
+ // get response
+ KeyValuePair<K,V> key_value_pair;
+ key_value_pair = protocol_->template getKeyValueResult<K,V>(SEQFILE_READNEXT);
+
+ if (!key_value_pair.is_empty) {
+ key = key_value_pair.first;
+ value = key_value_pair.second;
+ }
+
+ if (logging && key_value_pair.is_empty) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - END_OF_DATA\n");
+ }
+
+ return (!key_value_pair.is_empty);
+ }
+
+ /**
+ * Append the next key/value pair to the SequenceFile with fileID
+ * Using Curiously recurring template pattern(CTRP)
+ */
+ template<class K, class V>
+ bool sequenceFileAppend(int32_t file_id, const K& key, const V& value) {
+ string values[] = {key, value};
+ uplink_->sendCommand<int32_t,string>(SEQFILE_APPEND, file_id, values, 2);
+
+ int result = protocol_->template getResult<int32_t>(SEQFILE_APPEND);
+
+ if (logging && result==0) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileAppend - Nothing appended!\n");
+ } else if (logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileAppend - Successfully appended!\n");
+ }
+
+ return (result==1);
+ }
+
+ /********************************************/
+ /*************** Other STUFF ***************/
+ /********************************************/
+
+ void setProtocol(Protocol< BinaryProtocol<K1,V1> >* protocol, UpwardProtocol<BinaryUpwardProtocol>* uplink) {
+ protocol_ = protocol;
+ uplink_ = uplink;
+ }
+
+ bool isDone() {
+ pthread_mutex_lock(&mutex_done_);
+ bool done_copy = done_;
+ pthread_mutex_unlock(&mutex_done_);
+ return done_copy;
+ }
+
+ /**
+ * Advance to the next value.
+ */
+ /*
+ bool nextValue() {
+ if (isNewKey || done) {
+ return false;
+ }
+ isNewValue = false;
+ //progress();
+ protocol->nextEvent();
+ return isNewValue;
+ }
+ */
+ void waitForTask() {
+ while (!done_ && !has_task_) {
+ if(logging) {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::waitForTask - done: %s hasTask: %s\n",
+ (done_)?"true":"false", (has_task_)?"true":"false");
+ }
+ protocol_->nextEvent();
+ }
+ }
+ /*
+ bool nextKey() {
+ if (reader == NULL) {
+ while (!isNewKey) {
+ nextValue();
+ if (done) {
+ return false;
+ }
+ }
+ key = *newKey;
+ } else {
+ if (!reader->next(key, const_cast<string&>(*value))) {
+ pthread_mutex_lock(&mutexDone);
+ done = true;
+ pthread_mutex_unlock(&mutexDone);
+ return false;
+ }
+ //progressFloat = reader->getProgress();
+ }
+ isNewKey = false;
+
+ if (bsp != NULL) {
+ bsp->bsp(*this);
+ }
+ return true;
+ }
+ */
+ void closeAll() {
+ if (reader_) {
+ reader_->close();
+ }
+
+ if (bsp_) {
+ bsp_->close();
+ }
+
+ if (writer_) {
+ writer_->close();
+ }
+ }
+
+ virtual ~BSPContextImpl() {
+ delete job_;
+ //delete inputSplit_;
+ //if (reader) {
+ // delete value;
+ //}
+ delete reader_;
+ delete bsp_;
+ delete writer_;
+ pthread_mutex_destroy(&mutex_done_);
+ }
+};
+
+/**
+ * Ping the parent every 5 seconds to know if it is alive
+ */
+template<class K1, class V1, class K2, class V2, class M>
+void* ping(void* ptr) {
+ BSPContextImpl<K1, V1, K2, V2, M>* context = (BSPContextImpl<K1, V1, K2, V2, M>*) ptr;
+ char* portStr = getenv("hama.pipes.command.port");
+ int MAX_RETRIES = 3;
+ int remaining_retries = MAX_RETRIES;
+ while (!context->isDone()) {
+ try{
+ sleep(5);
+ int sock = -1;
+ if (portStr) {
+ sock = socket(PF_INET, SOCK_STREAM, 0);
+ HADOOP_ASSERT(sock != - 1,
+ string("problem creating socket: ") + strerror(errno));
+ sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(toInt(portStr));
+ addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::ping - connected to GroomServer Port: %s\n",
+ portStr);
+ }
+ HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
+ string("problem connecting command socket: ") +
+ strerror(errno));
+
+ }
+ if (sock != -1) {
+ int result = shutdown(sock, SHUT_RDWR);
+ HADOOP_ASSERT(result == 0, "problem shutting socket");
+ result = close(sock);
+ HADOOP_ASSERT(result == 0, "problem closing socket");
+ }
+ remaining_retries = MAX_RETRIES;
+ } catch (Error& err) {
+ if (!context->isDone()) {
+ fprintf(stderr, "Hama Pipes Exception: in ping %s\n",
+ err.getMessage().c_str());
+ remaining_retries -= 1;
+ if (remaining_retries == 0) {
+ exit(1);
+ }
+ } else {
+ return NULL;
+ }
+ }
+ }
+ return NULL;
+}
+
+/**
+ * Run the assigned task in the framework.
+ * The user's main function should set the various functions using the
+ * set* functions above and then call this.
+ * @return true, if the task succeeded.
+ */
+template<class K1, class V1, class K2, class V2, class M>
+bool runTask(const Factory<K1, V1, K2, V2, M>& factory) {
+ try {
+ HADOOP_ASSERT(getenv("hama.pipes.logging")!=NULL, "No environment found!");
+
+ logging = (toInt(getenv("hama.pipes.logging"))==0)?false:true;
+ if (logging) {
+ fprintf(stderr,"HamaPipes::runTask - logging is: %s\n",
+ ((logging)?"true":"false"));
+ }
+
+ BSPContextImpl<K1, V1, K2, V2, M>* context = new BSPContextImpl<K1, V1, K2, V2, M>(factory);
+ Protocol< BinaryProtocol<K1,V1> >* protocol;
+
+ char* port_str = getenv("hama.pipes.command.port");
+ int sock = -1;
+ FILE* in_stream = NULL;
+ FILE* out_stream = NULL;
+ char *bufin = NULL;
+ char *bufout = NULL;
+ if (port_str) {
+ sock = socket(PF_INET, SOCK_STREAM, 0);
+ HADOOP_ASSERT(sock != - 1,
+ string("problem creating socket: ") + strerror(errno));
+ sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(toInt(port_str));
+ addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
+ string("problem connecting command socket: ") +
+ strerror(errno));
+
+ in_stream = fdopen(sock, "r");
+ out_stream = fdopen(sock, "w");
+
+ // increase buffer size
+ int bufsize = 128*1024;
+ int setbuf;
+ bufin = new char[bufsize];
+ bufout = new char[bufsize];
+ setbuf = setvbuf(in_stream, bufin, _IOFBF, bufsize);
+ HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for in_stream: ")
+ + strerror(errno));
+ setbuf = setvbuf(out_stream, bufout, _IOFBF, bufsize);
+ HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for out_stream: ")
+ + strerror(errno));
+
+ protocol = new BinaryProtocol<K1,V1>(in_stream, context, out_stream);
+ if(logging) {
+ fprintf(stderr,"HamaPipes::runTask - connected to GroomServer Port: %s\n",
+ port_str);
+ }
+
+ } else if (getenv("hama.pipes.command.file")) {
+ char* filename = getenv("hama.pipes.command.file");
+ string out_filename = filename;
+ out_filename += ".out";
+ in_stream = fopen(filename, "r");
+ out_stream = fopen(out_filename.c_str(), "w");
+ protocol = new BinaryProtocol<K1,V1>(in_stream, context, out_stream);
+ } else {
+ //protocol = new TextProtocol(stdin, context, stdout);
+ fprintf(stderr,"HamaPipes::runTask - Protocol couldn't be initialized!\n");
+ return -1;
+ }
+
+ context->setProtocol(protocol, protocol->getUplink());
+
+ //pthread_t pingThread;
+ //pthread_create(&pingThread, NULL, ping, (void*)(context));
+
+ context->waitForTask();
+
+ //while (!context->isDone()) {
+ //context->nextKey();
+ //}
+
+ context->closeAll();
+ protocol->getUplink()->sendCommand(DONE);
+
+ //pthread_join(pingThread,NULL);
+
+ // Cleanup
+ delete context;
+ delete protocol;
+
+ if (in_stream != NULL) {
+ fflush(in_stream);
+ }
+ if (out_stream != NULL) {
+ fflush(out_stream);
+ }
+
+ fflush(stdout);
+
+ if (sock != -1) {
+ int result = shutdown(sock, SHUT_RDWR);
+ HADOOP_ASSERT(result == 0, "problem shutting socket");
+ result = close(sock);
+ HADOOP_ASSERT(result == 0, "problem closing socket");
+ }
+
+ //TODO REFACTOR
+ if (in_stream != NULL) {
+ //fclose(stream);
+ }
+ if (out_stream != NULL) {
+ //fclose(outStream);
+ }
+ delete bufin;
+ delete bufout;
+
+ return true;
+
+ } catch (Error& err) {
+ fprintf(stderr, "Hama Pipes Exception: %s\n",
+ err.getMessage().c_str());
+ return false;
+ }
+}
Modified: hama/trunk/c++/src/main/native/utils/api/hadoop/SerialUtils.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/utils/api/hadoop/SerialUtils.hh?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/utils/api/hadoop/SerialUtils.hh (original)
+++ hama/trunk/c++/src/main/native/utils/api/hadoop/SerialUtils.hh Sat Nov 23 08:41:09 2013
@@ -22,7 +22,7 @@
#include <stdint.h>
namespace HadoopUtils {
-
+
/**
* A simple exception class that records a message for the user.
*/
@@ -30,38 +30,38 @@ namespace HadoopUtils {
private:
std::string error;
public:
-
+
/**
* Create an error object with the given message.
*/
Error(const std::string& msg);
-
+
/**
* Construct an error object with the given message that was created on
* the given file, line, and functino.
*/
- Error(const std::string& msg,
+ Error(const std::string& msg,
const std::string& file, int line, const std::string& function);
-
+
/**
* Get the error message.
*/
const std::string& getMessage() const;
};
-
+
/**
* Check to make sure that the condition is true, and throw an exception
* if it is not. The exception will contain the message and a description
* of the source location.
*/
- #define HADOOP_ASSERT(CONDITION, MESSAGE) \
- { \
- if (!(CONDITION)) { \
- throw HadoopUtils::Error((MESSAGE), __FILE__, __LINE__, \
- __PRETTY_FUNCTION__); \
- } \
- }
-
+#define HADOOP_ASSERT(CONDITION, MESSAGE) \
+{ \
+if (!(CONDITION)) { \
+throw HadoopUtils::Error((MESSAGE), __FILE__, __LINE__, \
+__PRETTY_FUNCTION__); \
+} \
+}
+
/**
* An interface for an input stream.
*/
@@ -76,7 +76,7 @@ namespace HadoopUtils {
virtual void read(void *buf, size_t len) = 0;
virtual ~InStream() {}
};
-
+
/**
* An interface for an output stream.
*/
@@ -95,7 +95,7 @@ namespace HadoopUtils {
virtual void flush() = 0;
virtual ~OutStream() {}
};
-
+
/**
* A class to read a file as a stream.
*/
@@ -118,18 +118,18 @@ namespace HadoopUtils {
*/
bool isOwned;
};
-
+
/**
* A class to write a stream to a file.
*/
class FileOutStream: public OutStream {
public:
-
+
/**
* Create a stream that isn't bound to anything.
*/
FileOutStream();
-
+
/**
* Create the given file, potentially overwriting an existing file.
*/
@@ -144,7 +144,7 @@ namespace HadoopUtils {
FILE *mFile;
bool isOwned;
};
-
+
/**
* A stream that reads from a string.
*/
@@ -156,15 +156,17 @@ namespace HadoopUtils {
const std::string& buffer;
std::string::const_iterator itr;
};
-
+
void serializeInt(int32_t t, OutStream& stream);
int32_t deserializeInt(InStream& stream);
void serializeLong(int64_t t, OutStream& stream);
int64_t deserializeLong(InStream& stream);
void serializeFloat(float t, OutStream& stream);
float deserializeFloat(InStream& stream);
+ void serializeDouble(double t, OutStream& stream);
+ double deserializeDouble(InStream& stream);
void serializeString(const std::string& t, OutStream& stream);
- void deserializeString(std::string& t, InStream& stream);
+ std::string deserializeString(InStream& stream);
}
#endif
Modified: hama/trunk/c++/src/main/native/utils/impl/SerialUtils.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/utils/impl/SerialUtils.cc?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/utils/impl/SerialUtils.cc (original)
+++ hama/trunk/c++/src/main/native/utils/impl/SerialUtils.cc Sat Nov 23 08:41:09 2013
@@ -27,41 +27,41 @@
using std::string;
namespace HadoopUtils {
-
+
Error::Error(const std::string& msg): error(msg) {
}
-
- Error::Error(const std::string& msg,
- const std::string& file, int line,
+
+ Error::Error(const std::string& msg,
+ const std::string& file, int line,
const std::string& function) {
- error = msg + " at " + file + ":" + toString(line) +
- " in " + function;
+ error = msg + " at " + file + ":" + toString(line) +
+ " in " + function;
}
-
+
const std::string& Error::getMessage() const {
return error;
}
-
+
FileInStream::FileInStream()
{
mFile = NULL;
isOwned = false;
}
-
+
bool FileInStream::open(const std::string& name)
{
mFile = fopen(name.c_str(), "rb");
isOwned = true;
return (mFile != NULL);
}
-
+
bool FileInStream::open(FILE* file)
{
mFile = file;
isOwned = false;
return (mFile != NULL);
}
-
+
void FileInStream::read(void *buf, size_t len)
{
size_t result = fread(buf, len, 1, mFile);
@@ -73,12 +73,12 @@ namespace HadoopUtils {
}
}
}
-
+
bool FileInStream::skip(size_t nbytes)
{
return (0==fseek(mFile, nbytes, SEEK_CUR));
}
-
+
bool FileInStream::close()
{
int ret = 0;
@@ -88,20 +88,20 @@ namespace HadoopUtils {
mFile = NULL;
return (ret==0);
}
-
+
FileInStream::~FileInStream()
{
if (mFile != NULL) {
close();
}
}
-
+
FileOutStream::FileOutStream()
{
mFile = NULL;
isOwned = false;
}
-
+
bool FileOutStream::open(const std::string& name, bool overwrite)
{
if (!overwrite) {
@@ -115,26 +115,26 @@ namespace HadoopUtils {
isOwned = true;
return (mFile != NULL);
}
-
+
bool FileOutStream::open(FILE* file)
{
mFile = file;
isOwned = false;
return (mFile != NULL);
}
-
+
void FileOutStream::write(const void* buf, size_t len)
{
size_t result = fwrite(buf, len, 1, mFile);
HADOOP_ASSERT(result == 1,
string("write error to file: ") + strerror(errno));
}
-
+
bool FileOutStream::advance(size_t nbytes)
{
return (0==fseek(mFile, nbytes, SEEK_CUR));
}
-
+
bool FileOutStream::close()
{
int ret = 0;
@@ -144,23 +144,23 @@ namespace HadoopUtils {
mFile = NULL;
return (ret == 0);
}
-
+
void FileOutStream::flush()
{
fflush(mFile);
}
-
+
FileOutStream::~FileOutStream()
{
if (mFile != NULL) {
close();
}
}
-
+
StringInStream::StringInStream(const std::string& str): buffer(str) {
itr = buffer.begin();
}
-
+
void StringInStream::read(void *buf, size_t buflen) {
size_t bytes = 0;
char* output = (char*) buf;
@@ -174,11 +174,11 @@ namespace HadoopUtils {
}
HADOOP_ASSERT(bytes == buflen, "unexpected end of string reached");
}
-
+
void serializeInt(int32_t t, OutStream& stream) {
serializeLong(t,stream);
}
-
+
void serializeLong(int64_t t, OutStream& stream)
{
if (t >= -112 && t <= 127) {
@@ -186,22 +186,22 @@ namespace HadoopUtils {
stream.write(&b, 1);
return;
}
-
+
int8_t len = -112;
if (t < 0) {
t ^= -1ll; // reset the sign bit
len = -120;
}
-
+
uint64_t tmp = t;
while (tmp != 0) {
tmp = tmp >> 8;
len--;
}
-
- stream.write(&len, 1);
+
+ stream.write(&len, 1);
len = (len < -120) ? -(len + 120) : -(len + 112);
-
+
for (uint32_t idx = len; idx != 0; idx--) {
uint32_t shiftbits = (idx - 1) * 8;
uint64_t mask = 0xFFll << shiftbits;
@@ -209,11 +209,11 @@ namespace HadoopUtils {
stream.write(&b, 1);
}
}
-
+
int32_t deserializeInt(InStream& stream) {
return deserializeLong(stream);
}
-
+
int64_t deserializeLong(InStream& stream)
{
int8_t b;
@@ -242,7 +242,7 @@ namespace HadoopUtils {
}
return t;
}
-
+
void serializeFloat(float t, OutStream& stream)
{
char buf[sizeof(float)];
@@ -251,16 +251,38 @@ namespace HadoopUtils {
xdr_float(&xdrs, &t);
stream.write(buf, sizeof(float));
}
-
- void deserializeFloat(float& t, InStream& stream)
+
+ float deserializeFloat(InStream& stream)
{
+ float t;
char buf[sizeof(float)];
stream.read(buf, sizeof(float));
XDR xdrs;
xdrmem_create(&xdrs, buf, sizeof(float), XDR_DECODE);
xdr_float(&xdrs, &t);
+ return t;
}
-
+
+ void serializeDouble(double t, OutStream& stream)
+ {
+ char buf[sizeof(double)];
+ XDR xdrs;
+ xdrmem_create(&xdrs, buf, sizeof(double), XDR_ENCODE);
+ xdr_double(&xdrs, &t);
+ stream.write(buf, sizeof(double));
+ }
+
+ double deserializeDouble(InStream& stream)
+ {
+ double t;
+ char buf[sizeof(double)];
+ stream.read(buf, sizeof(double));
+ XDR xdrs;
+ xdrmem_create(&xdrs, buf, sizeof(double), XDR_DECODE);
+ xdr_double(&xdrs, &t);
+ return t;
+ }
+
void serializeString(const std::string& t, OutStream& stream)
{
serializeInt(t.length(), stream);
@@ -268,9 +290,10 @@ namespace HadoopUtils {
stream.write(t.data(), t.length());
}
}
-
- void deserializeString(std::string& t, InStream& stream)
+
+ std::string deserializeString(InStream& stream)
{
+ std::string t;
int32_t len = deserializeInt(stream);
if (len > 0) {
// resize the string to the right length
@@ -289,6 +312,6 @@ namespace HadoopUtils {
} else {
t.clear();
}
+ return t;
}
-
}
Modified: hama/trunk/c++/src/main/native/utils/impl/StringUtils.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/utils/impl/StringUtils.cc?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/utils/impl/StringUtils.cc (original)
+++ hama/trunk/c++/src/main/native/utils/impl/StringUtils.cc Sat Nov 23 08:41:09 2013
@@ -32,14 +32,14 @@ using std::string;
using std::vector;
namespace HadoopUtils {
-
+
// Added by Apache Hama Pipes
string toString(double x) {
std::stringstream ss;
ss << std::setprecision(16) << x;
return ss.str();
}
-
+
// Added by Apache Hama Pipes
double toDouble(const string& val) {
const char* begin = val.c_str();
@@ -47,36 +47,36 @@ namespace HadoopUtils {
double result = strtod(begin, &end);
size_t s = end - begin;
if(s < val.size()) {
- throw Error("Problem converting "+val+" to double. (result:"
+ throw Error("Problem converting '"+val+"' to double. (result:"
+toString(result)+")");
}
return result;
}
-
+
string toString(int32_t x) {
char str[100];
sprintf(str, "%d", x);
return str;
}
-
+
int toInt(const string& val) {
int result;
char trash;
int num = sscanf(val.c_str(), "%d%c", &result, &trash);
HADOOP_ASSERT(num == 1,
- "Problem converting " + val + " to integer.");
+ "Problem converting '" + val + "' to integer.");
return result;
}
-
+
float toFloat(const string& val) {
float result;
char trash;
int num = sscanf(val.c_str(), "%f%c", &result, &trash);
HADOOP_ASSERT(num == 1,
- "Problem converting " + val + " to float.");
+ "Problem converting '" + val + "' to float.");
return result;
}
-
+
bool toBool(const string& val) {
if (val == "true") {
return true;
@@ -84,10 +84,10 @@ namespace HadoopUtils {
return false;
} else {
HADOOP_ASSERT(false,
- "Problem converting " + val + " to boolean.");
+ "Problem converting '" + val + "' to boolean.");
}
}
-
+
/**
* Get the current time in the number of milliseconds since 1970.
*/
@@ -98,15 +98,15 @@ namespace HadoopUtils {
HADOOP_ASSERT(sys != -1, strerror(errno));
return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}
-
+
vector<string> splitString(const std::string& str,
- const char* separator) {
+ const char* separator) {
vector<string> result;
string::size_type prev_pos=0;
string::size_type pos=0;
while ((pos = str.find_first_of(separator, prev_pos)) != string::npos) {
if (prev_pos < pos) {
- result.push_back(str.substr(prev_pos, pos-prev_pos));
+ result.push_back(str.substr(prev_pos, pos-prev_pos));
}
prev_pos = pos + 1;
}
@@ -115,7 +115,7 @@ namespace HadoopUtils {
}
return result;
}
-
+
string quoteString(const string& str,
const char* deliminators) {
@@ -123,31 +123,31 @@ namespace HadoopUtils {
for(int i=result.length() -1; i >= 0; --i) {
char ch = result[i];
if (!isprint(ch) ||
- ch == '\\' ||
+ ch == '\\' ||
strchr(deliminators, ch)) {
switch (ch) {
- case '\\':
- result.replace(i, 1, "\\\\");
- break;
- case '\t':
- result.replace(i, 1, "\\t");
- break;
- case '\n':
- result.replace(i, 1, "\\n");
- break;
- case ' ':
- result.replace(i, 1, "\\s");
- break;
- default:
- char buff[4];
- sprintf(buff, "\\%02x", static_cast<unsigned char>(result[i]));
- result.replace(i, 1, buff);
+ case '\\':
+ result.replace(i, 1, "\\\\");
+ break;
+ case '\t':
+ result.replace(i, 1, "\\t");
+ break;
+ case '\n':
+ result.replace(i, 1, "\\n");
+ break;
+ case ' ':
+ result.replace(i, 1, "\\s");
+ break;
+ default:
+ char buff[4];
+ sprintf(buff, "\\%02x", static_cast<unsigned char>(result[i]));
+ result.replace(i, 1, buff);
}
}
}
return result;
}
-
+
string unquoteString(const string& str) {
string result(str);
string::size_type current = result.find('\\');
@@ -158,8 +158,8 @@ namespace HadoopUtils {
if (isxdigit(result[current+1])) {
num_chars = 2;
HADOOP_ASSERT(current + num_chars < result.size(),
- "escape pattern \\<hex><hex> is missing second digit in '"
- + str + "'");
+ "escape pattern \\<hex><hex> is missing second digit in '"
+ + str + "'");
char sub_str[3];
sub_str[0] = result[current+1];
sub_str[1] = result[current+2];
@@ -167,27 +167,27 @@ namespace HadoopUtils {
char* end_ptr = NULL;
long int int_val = strtol(sub_str, &end_ptr, 16);
HADOOP_ASSERT(*end_ptr == '\0' && int_val >= 0,
- "escape pattern \\<hex><hex> is broken in '" + str + "'");
+ "escape pattern \\<hex><hex> is broken in '" + str + "'");
new_ch = static_cast<char>(int_val);
} else {
num_chars = 1;
switch(result[current+1]) {
- case '\\':
- new_ch = '\\';
- break;
- case 't':
- new_ch = '\t';
- break;
- case 'n':
- new_ch = '\n';
- break;
- case 's':
- new_ch = ' ';
- break;
- default:
- string msg("unknow n escape character '");
- msg += result[current+1];
- HADOOP_ASSERT(false, msg + "' found in '" + str + "'");
+ case '\\':
+ new_ch = '\\';
+ break;
+ case 't':
+ new_ch = '\t';
+ break;
+ case 'n':
+ new_ch = '\n';
+ break;
+ case 's':
+ new_ch = ' ';
+ break;
+ default:
+ string msg("unknow n escape character '");
+ msg += result[current+1];
+ HADOOP_ASSERT(false, msg + "' found in '" + str + "'");
}
}
result.replace(current, 1 + num_chars, 1, new_ch);
@@ -198,5 +198,5 @@ namespace HadoopUtils {
}
return result;
}
-
+
}
Added: hama/trunk/commons/src/main/java/org/apache/hama/commons/io/KeyValueWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/io/KeyValueWritable.java?rev=1544761&view=auto
==============================================================================
--- hama/trunk/commons/src/main/java/org/apache/hama/commons/io/KeyValueWritable.java (added)
+++ hama/trunk/commons/src/main/java/org/apache/hama/commons/io/KeyValueWritable.java Sat Nov 23 08:41:09 2013
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.commons.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * KeyValueWritable containing a key-value WritableComparable pair.
+ *
+ * @param <K> the class of key
+ * @param <V> the class of value
+ */
+public class KeyValueWritable<K extends WritableComparable, V extends WritableComparable>
+ implements WritableComparable<KeyValueWritable<K, V>> {
+
+ protected K key = null;
+ protected V value = null;
+
+ public KeyValueWritable() {
+ }
+
+ public KeyValueWritable(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public void setKey(K key) {
+ this.key = key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+
+ public void setValue(V value) {
+ this.value = value;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ key.readFields(in);
+ value.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ key.write(out);
+ value.write(out);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((key == null) ? 0 : key.hashCode());
+ result = prime * result + ((value == null) ? 0 : value.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ KeyValueWritable<K, V> other = (KeyValueWritable<K, V>) obj;
+ if (key == null) {
+ if (other.key != null) {
+ return false;
+ }
+ } else if (!key.equals(other.key)) {
+ return false;
+ }
+
+ if (value == null) {
+ if (other.value != null) {
+ return false;
+ }
+ } else if (!value.equals(other.value)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int compareTo(KeyValueWritable<K, V> obj) {
+ int cmp = key.compareTo(obj.key);
+ if (cmp != 0) {
+ return cmp;
+ }
+ // if keys are equal compare value
+ return value.compareTo(obj.value);
+ }
+
+}
Added: hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesKeyValueWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesKeyValueWritable.java?rev=1544761&view=auto
==============================================================================
--- hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesKeyValueWritable.java (added)
+++ hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesKeyValueWritable.java Sat Nov 23 08:41:09 2013
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.commons.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * PipesKeyValueWritable containing a Text key-value WritableComparable pair.
+ * Format: delimiter | key | delimiter | value e.g., :Key:Value with delimiter
+ * ':' or |Key|Value with delimiter '|'
+ */
+public final class PipesKeyValueWritable extends KeyValueWritable<Text, Text> {
+
+ // private static final Log LOG =
+ // LogFactory.getLog(PipesKeyValueWritable.class);
+
+ /**
+ * Delimiter between key and value
+ */
+ private char keyValueDelimiter;
+
+ public PipesKeyValueWritable() {
+ super();
+ }
+
+ public PipesKeyValueWritable(Text key, Text value, char keyValueDelimiter) {
+ super(key, value);
+ this.keyValueDelimiter = keyValueDelimiter;
+ }
+
+ public char getKeyValueDelimiter() {
+ return keyValueDelimiter;
+ }
+
+ public void setKeyValueDelimiter(char keyValueDelimiter) {
+ this.keyValueDelimiter = keyValueDelimiter;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ String str = Text.readString(in);
+ // LOG.debug("readFields: '" + str + "'");
+
+ this.keyValueDelimiter = str.charAt(0);
+ str = str.substring(1);
+ String[] result = str.split(String.valueOf(this.keyValueDelimiter), 2);
+ super.setKey(new Text(result[0]));
+ super.setValue(new Text(result[1]));
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // LOG.debug("write: '" + this.toString() + "'");
+ Text.writeString(out, this.toString());
+ }
+
+ @Override
+ public String toString() {
+ return this.keyValueDelimiter + super.getKey().toString()
+ + this.keyValueDelimiter + super.getValue().toString();
+ }
+
+ @Override
+ public int compareTo(KeyValueWritable<Text, Text> obj) {
+ // if key is numeric compare numbers
+ if ((isNumeric(key.toString())) && (isNumeric(obj.key.toString()))) {
+ double val1 = Double.parseDouble(key.toString());
+ double val2 = Double.parseDouble(obj.key.toString());
+ int cmp = Double.compare(val1, val2);
+ if (cmp != 0) {
+ return cmp;
+ }
+ } else { // else compare key string
+ int cmp = key.compareTo(obj.key);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ // if keys are equal compare value
+ return value.compareTo(obj.value);
+ }
+
+ public final static boolean isNumeric(String s) {
+ return s.matches("[-+]?\\d*\\.?\\d+");
+ }
+}
Added: hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesVectorWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesVectorWritable.java?rev=1544761&view=auto
==============================================================================
--- hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesVectorWritable.java (added)
+++ hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesVectorWritable.java Sat Nov 23 08:41:09 2013
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.commons.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleVector;
+
+/**
+ * VectorWritable for Hama Pipes dense vectors.
+ */
+public final class PipesVectorWritable extends VectorWritable {
+
+ // private static final Log LOG =
+ // LogFactory.getLog(PipesVectorWritable.class);
+
+ public PipesVectorWritable() {
+ super();
+ }
+
+ public PipesVectorWritable(VectorWritable v) {
+ super(v);
+ }
+
+ public PipesVectorWritable(DoubleVector v) {
+ super(v);
+ }
+
+ @Override
+ public final void write(DataOutput out) throws IOException {
+ writeVector(super.getVector(), out);
+ }
+
+ @Override
+ public final void readFields(DataInput in) throws IOException {
+ super.set(readVector(in));
+ }
+
+ public static void writeVector(DoubleVector vector, DataOutput out)
+ throws IOException {
+ String str = "";
+ for (int i = 0; i < vector.getLength(); i++) {
+ str += (i < vector.getLength() - 1) ? vector.get(i) + ", " : vector
+ .get(i);
+ }
+
+ // LOG.debug("writeVector: '" + str + "'");
+ Text.writeString(out, str);
+ }
+
+ public static DoubleVector readVector(DataInput in) throws IOException {
+ String str = Text.readString(in);
+ // LOG.debug("readVector: '" + str + "'");
+
+ String[] values = str.split(",");
+ int len = values.length;
+ DoubleVector vector = new DenseDoubleVector(len);
+ for (int i = 0; i < len; i++) {
+ vector.set(i, Double.parseDouble(values[i]));
+ }
+ return vector;
+ }
+}
Modified: hama/trunk/commons/src/main/java/org/apache/hama/commons/io/VectorWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/io/VectorWritable.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/commons/src/main/java/org/apache/hama/commons/io/VectorWritable.java (original)
+++ hama/trunk/commons/src/main/java/org/apache/hama/commons/io/VectorWritable.java Sat Nov 23 08:41:09 2013
@@ -28,7 +28,7 @@ import org.apache.hama.commons.math.Doub
/**
* Writable for dense vectors.
*/
-public final class VectorWritable implements WritableComparable<VectorWritable> {
+public class VectorWritable implements WritableComparable<VectorWritable> {
private DoubleVector vector;
@@ -45,12 +45,12 @@ public final class VectorWritable implem
}
@Override
- public final void write(DataOutput out) throws IOException {
+ public void write(DataOutput out) throws IOException {
writeVector(this.vector, out);
}
@Override
- public final void readFields(DataInput in) throws IOException {
+ public void readFields(DataInput in) throws IOException {
this.vector = readVector(in);
}
Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Sat Nov 23 08:41:09 2013
@@ -209,6 +209,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
<executions>
<execution>
<id>add-source</id>
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Sat Nov 23 08:41:09 2013
@@ -22,10 +22,7 @@ import java.net.URL;
import java.net.URLDecoder;
import java.util.Enumeration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -34,10 +31,6 @@ import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
-import org.apache.hama.pipes.PipesApplicable;
-import org.apache.hama.pipes.PipesApplication;
-import org.apache.hama.pipes.PipesPartitioner;
-import org.apache.hama.pipes.util.DistributedCacheUtil;
/**
* A BSP job configuration.
@@ -54,9 +47,6 @@ public class BSPJob extends BSPJobContex
private BSPJobClient jobClient;
private RunningJob info;
- private PipesApplication<?, ?, ?, ?, ?> pipesApp = null;
- private static final Log LOG = LogFactory.getLog(BSPJob.class);
-
public BSPJob() throws IOException {
this(new HamaConfiguration());
}
@@ -260,23 +250,6 @@ public class BSPJob extends BSPJobContex
return conf.getBoolean(name, defaultValue);
}
- public final <K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable> PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> getPipesApplication() {
- if (pipesApp == null)
- pipesApp = new PipesApplication<K1, V1, K2, V2, BytesWritable>();
-
- return pipesApp;
- }
-
- public void cleanup() {
- try {
- // Close client pipesApplication
- if (this.getPipesApplication() != null)
- this.getPipesApplication().cleanup();
- } catch (IOException e) {
- LOG.error(e);
- }
- }
-
public void setNumBspTask(int tasks) {
conf.setInt("bsp.peers.num", tasks);
}
@@ -416,36 +389,6 @@ public class BSPJob extends BSPJobContex
}
@SuppressWarnings("rawtypes")
- public Partitioner getPartitioner() {
-
- Class<? extends Partitioner> partitionerClass = conf.getClass(
- Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
- Partitioner.class);
-
- LOG.info("DEBUG: " + Constants.RUNTIME_PARTITIONING_CLASS + ": "
- + partitionerClass.toString());
-
- Partitioner partitioner = ReflectionUtils.newInstance(partitionerClass,
- conf);
-
- /* PipesPartitioner usage */
- if (PipesPartitioner.class.equals(partitionerClass)) {
- ((PipesApplicable) partitioner)
- .setApplication(this.getPipesApplication());
-
- try {
- DistributedCacheUtil.moveLocalFiles(conf);
- this.getPipesApplication().start(conf);
- } catch (IOException e) {
- LOG.error(e);
- } catch (InterruptedException e) {
- LOG.error(e);
- }
- }
- return partitioner;
- }
-
- @SuppressWarnings("rawtypes")
public OutputFormat getOutputFormat() {
return ReflectionUtils.newInstance(conf.getClass(
Constants.OUTPUT_FORMAT_CLASS, TextOutputFormat.class,
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Sat Nov 23 08:41:09 2013
@@ -52,7 +52,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hama.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Tool;
@@ -61,6 +60,7 @@ import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.hama.ipc.RPC;
/**
* BSPJobClient is the primary interface for the user-job to interact with the
@@ -429,7 +429,8 @@ public class BSPJobClient extends Config
job.get(Constants.RUNTIME_PARTITIONING_CLASS));
}
BSPJob partitioningJob = new BSPJob(conf);
- LOG.debug("partitioningJob input: " + partitioningJob.get(Constants.JOB_INPUT_DIR));
+ LOG.debug("partitioningJob input: "
+ + partitioningJob.get(Constants.JOB_INPUT_DIR));
partitioningJob.setInputFormat(job.getInputFormat().getClass());
partitioningJob.setInputKeyClass(job.getInputKeyClass());
partitioningJob.setInputValueClass(job.getInputValueClass());
@@ -768,9 +769,6 @@ public class BSPJobClient extends Config
// TODO if error found, kill job
// running.killJob();
jc.close();
-
- // Added cleanup for Client PipesApp and DistributedCache
- job.cleanup();
}
/**
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Sat Nov 23 08:41:09 2013
@@ -33,8 +33,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.ipc.BSPPeerProtocol;
-import org.apache.hama.pipes.PipesApplicable;
-import org.apache.hama.pipes.PipesBSP;
/**
* Base class for tasks.
@@ -166,10 +164,6 @@ public final class BSPTask extends Task
LOG.debug("bsp.work.class: " + workClass.toString());
- /* Setup PipesApplication if workClass is matching */
- if (PipesBSP.class.equals(workClass))
- ((PipesApplicable) bsp).setApplication(job.getPipesApplication());
-
// The policy is to throw the first exception and log the remaining.
Exception firstException = null;
try {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Sat Nov 23 08:41:09 2013
@@ -18,6 +18,7 @@
package org.apache.hama.bsp;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -37,6 +38,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.hama.Constants;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.commons.util.KeyValuePair;
+import org.apache.hama.pipes.PipesPartitioner;
public class PartitioningRunner extends
BSP<Writable, Writable, Writable, Writable, NullWritable> {
@@ -47,7 +49,8 @@ public class PartitioningRunner extends
private FileSystem fs = null;
private Path partitionDir;
private RecordConverter converter;
- private Map<Integer,LinkedList<KeyValuePair<Writable,Writable>>> values = new HashMap<Integer, LinkedList<KeyValuePair<Writable,Writable>>>();
+ private Map<Integer, LinkedList<KeyValuePair<Writable, Writable>>> values = new HashMap<Integer, LinkedList<KeyValuePair<Writable, Writable>>>();
+ private PipesPartitioner<?, ?> pipesPartitioner = null;
@Override
public final void setup(
@@ -103,7 +106,7 @@ public class PartitioningRunner extends
* needed.
*/
public Map<Writable, Writable> newMap();
-
+
/**
* @return a list implementation, so order will not be changed in subclasses
*/
@@ -143,7 +146,7 @@ public class PartitioningRunner extends
@Override
public List<KeyValuePair<Writable, Writable>> newList() {
- return new LinkedList<KeyValuePair<Writable,Writable>>();
+ return new LinkedList<KeyValuePair<Writable, Writable>>();
}
}
@@ -172,17 +175,20 @@ public class PartitioningRunner extends
int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
desiredNum);
-
+
LinkedList<KeyValuePair<Writable, Writable>> list = values.get(index);
if (list == null) {
- list = (LinkedList<KeyValuePair<Writable, Writable>>) converter.newList();
+ list = (LinkedList<KeyValuePair<Writable, Writable>>) converter
+ .newList();
values.put(index, list);
}
- list.add(new KeyValuePair<Writable, Writable>(pair.getKey(), pair.getValue()));
+ list.add(new KeyValuePair<Writable, Writable>(pair.getKey(), pair
+ .getValue()));
}
// The reason of use of Memory is to reduce file opens
- for (Map.Entry<Integer, LinkedList<KeyValuePair<Writable, Writable>>> e : values.entrySet()) {
+ for (Map.Entry<Integer, LinkedList<KeyValuePair<Writable, Writable>>> e : values
+ .entrySet()) {
Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
+ peer.getPeerIndex());
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
@@ -240,15 +246,43 @@ public class PartitioningRunner extends
}
}
+ @Override
+ public void cleanup(
+ BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
+ throws IOException {
+ if (this.pipesPartitioner != null) {
+ this.pipesPartitioner.cleanup();
+ }
+ }
+
public static int getMergeProcessorID(int partitionID, int peerNum) {
return partitionID % peerNum;
}
@SuppressWarnings("rawtypes")
public Partitioner getPartitioner() {
- return ReflectionUtils.newInstance(conf.getClass(
+ Class<? extends Partitioner> partitionerClass = conf.getClass(
Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
- Partitioner.class), conf);
+ Partitioner.class);
+
+ LOG.debug(Constants.RUNTIME_PARTITIONING_CLASS + ": "
+ + partitionerClass.toString());
+
+ // Check for Hama Pipes Partitioner
+ Partitioner partitioner = null;
+ if (PipesPartitioner.class.equals(partitionerClass)) {
+ try {
+ Constructor<? extends Partitioner> ctor = partitionerClass
+ .getConstructor(Configuration.class);
+ partitioner = ctor.newInstance(conf);
+ this.pipesPartitioner = (PipesPartitioner) partitioner;
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ } else {
+ partitioner = ReflectionUtils.newInstance(partitionerClass, conf);
+ }
+ return partitioner;
}
private static String getPartitionName(int i) {
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java Sat Nov 23 08:41:09 2013
@@ -37,7 +37,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hama.bsp.BSPPeer;
@@ -54,10 +53,9 @@ import org.apache.hama.pipes.protocol.St
* Adapted from Hadoop Pipes.
*
*/
-public class PipesApplication<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable, M extends Writable> {
+public class PipesApplication<K1, V1, K2, V2, M extends Writable> {
- private static final Log LOG = LogFactory.getLog(PipesApplication.class
- .getName());
+ private static final Log LOG = LogFactory.getLog(PipesApplication.class);
private ServerSocket serverSocket;
private Process process;
private Socket clientSocket;
@@ -72,7 +70,7 @@ public class PipesApplication<K1 extends
}
/* Build Environment based on the Configuration */
- private Map<String, String> setupEnvironment(Configuration conf)
+ public Map<String, String> setupEnvironment(Configuration conf)
throws IOException {
Map<String, String> env = new HashMap<String, String>();
@@ -220,7 +218,7 @@ public class PipesApplication<K1 extends
clientSocket = serverSocket.accept();
LOG.debug("DEBUG: Client connected! - start BinaryProtocol!");
- downlink = new BinaryProtocol<K1, V1, K2, V2>(conf,
+ downlink = new BinaryProtocol<K1, V1, K2, V2, M>(conf,
clientSocket.getOutputStream(), clientSocket.getInputStream());
downlink.start();
@@ -249,8 +247,8 @@ public class PipesApplication<K1 extends
* @throws IOException
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public void start(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
- throws IOException, InterruptedException {
+ public void start(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+ InterruptedException {
Map<String, String> env = setupEnvironment(peer.getConfiguration());
List<String> cmd = setupCommand(peer.getConfiguration());
@@ -291,7 +289,7 @@ public class PipesApplication<K1 extends
clientSocket = serverSocket.accept();
LOG.debug("DEBUG: Client connected! - start BinaryProtocol!");
- downlink = new BinaryProtocol<K1, V1, K2, V2>(peer,
+ downlink = new BinaryProtocol<K1, V1, K2, V2, M>(peer,
clientSocket.getOutputStream(), clientSocket.getInputStream());
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java Sat Nov 23 08:41:09 2013
@@ -32,10 +32,10 @@ import org.apache.hama.bsp.sync.SyncExce
* runtimes.
*/
public class PipesBSP<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable, M extends Writable>
- extends BSP<K1, V1, K2, V2, BytesWritable> implements PipesApplicable {
+ extends BSP<K1, V1, K2, V2, BytesWritable> {
private static final Log LOG = LogFactory.getLog(PipesBSP.class);
- private PipesApplication<K1, V1, K2, V2, BytesWritable> application;
+ private PipesApplication<K1, V1, K2, V2, BytesWritable> application = new PipesApplication<K1, V1, K2, V2, BytesWritable>();
@Override
public void setup(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
@@ -97,12 +97,4 @@ public class PipesBSP<K1 extends Writabl
}
}
- @SuppressWarnings("unchecked")
- @Override
- public void setApplication(
- PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> pipesApp) {
-
- this.application = (PipesApplication<K1, V1, K2, V2, BytesWritable>) pipesApp;
- }
-
}