You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by mi...@apache.org on 2013/09/30 09:26:16 UTC
svn commit: r1527466 [2/2] - in /hama/trunk: ./ c++/src/
c++/src/main/native/examples/ c++/src/main/native/examples/conf/
c++/src/main/native/examples/impl/ c++/src/main/native/examples/input/
c++/src/main/native/pipes/impl/ c++/src/main/native/utils/i...
Modified: hama/trunk/c++/src/main/native/pipes/impl/HamaPipes.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/impl/HamaPipes.cc?rev=1527466&r1=1527465&r2=1527466&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/impl/HamaPipes.cc (original)
+++ hama/trunk/c++/src/main/native/pipes/impl/HamaPipes.cc Mon Sep 30 07:26:15 2013
@@ -50,11 +50,11 @@ using std::endl;
using namespace HadoopUtils;
namespace HamaPipes {
-
+
bool logging;
/********************************************/
- /****************** BSPJob ******************/
+ /****************** BSPJob ******************/
/********************************************/
class BSPJobImpl: public BSPJob {
private:
@@ -63,37 +63,37 @@ namespace HamaPipes {
void set(const string& key, const string& value) {
values[key] = value;
}
-
+
virtual bool hasKey(const string& key) const {
return values.find(key) != values.end();
}
-
+
virtual const string& get(const string& key) const {
map<string,string>::const_iterator itr = values.find(key);
if (itr == values.end()) {
throw Error("Key " + key + " not found in BSPJob");
- }
+ }
return itr->second;
}
-
+
virtual int getInt(const string& key) const {
const string& val = get(key);
return toInt(val);
}
-
+
virtual float getFloat(const string& key) const {
const string& val = get(key);
return toFloat(val);
}
-
+
virtual bool getBoolean(const string&key) const {
const string& val = get(key);
return toBool(val);
}
};
-
+
/********************************************/
- /************* DownwardProtocol *************/
+ /************* DownwardProtocol *************/
/********************************************/
class DownwardProtocol {
public:
@@ -101,26 +101,26 @@ namespace HamaPipes {
virtual void setBSPJob(vector<string> values) = 0;
virtual void setInputTypes(string keyType, string valueType) = 0;
virtual void setKeyValue(const string& _key, const string& _value) = 0;
-
+
virtual void runBsp(bool pipedInput, bool pipedOutput) = 0;
virtual void runCleanup(bool pipedInput, bool pipedOutput) = 0;
virtual void runSetup(bool pipedInput, bool pipedOutput) = 0;
- virtual void runPartition(const string& key, const string& value, int32_t numTasks) = 0;
-
+ virtual void runPartition(const string& key, const string& value, int32_t numTasks) = 0;
+
virtual void setNewResult(int32_t value) = 0;
- virtual void setNewResult(int64_t value) = 0;
+ virtual void setNewResult(int64_t value) = 0;
virtual void setNewResult(const string& value) = 0;
virtual void setNewResult(vector<string> value) = 0;
-
+
//virtual void reduceKey(const string& key) = 0;
//virtual void reduceValue(const string& value) = 0;
virtual void close() = 0;
virtual void abort() = 0;
virtual ~DownwardProtocol() {}
};
-
+
/********************************************/
- /************** UpwardProtocol **************/
+ /************** UpwardProtocol **************/
/********************************************/
class UpwardProtocol {
public:
@@ -129,15 +129,15 @@ namespace HamaPipes {
virtual void sendCMD(int32_t cmd, int32_t value, const string values[], int size) = 0;
virtual void sendCMD(int32_t cmd, const string& value) = 0;
virtual void sendCMD(int32_t cmd, const string values[], int size) = 0;
-
+
//virtual void registerCounter(int id, const string& group, const string& name) = 0;
//virtual void incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0;
virtual void incrementCounter(const string& group, const string& name, uint64_t amount) = 0;
virtual ~UpwardProtocol() {}
};
-
+
/********************************************/
- /***************** Protocol *****************/
+ /***************** Protocol *****************/
/********************************************/
class Protocol {
public:
@@ -145,47 +145,47 @@ namespace HamaPipes {
virtual UpwardProtocol* getUplink() = 0;
virtual ~Protocol(){}
};
-
+
/********************************************/
- /*************** MESSAGE_TYPE ***************/
+ /*************** MESSAGE_TYPE ***************/
/********************************************/
enum MESSAGE_TYPE {
- START_MESSAGE, SET_BSPJOB_CONF, SET_INPUT_TYPES,
- RUN_SETUP, RUN_BSP, RUN_CLEANUP,
- READ_KEYVALUE, WRITE_KEYVALUE,
- GET_MSG, GET_MSG_COUNT,
- SEND_MSG, SYNC,
- GET_ALL_PEERNAME, GET_PEERNAME,
- GET_PEER_INDEX, GET_PEER_COUNT, GET_SUPERSTEP_COUNT,
- REOPEN_INPUT, CLEAR,
- CLOSE, ABORT,
- DONE, TASK_DONE,
- REGISTER_COUNTER, INCREMENT_COUNTER,
- SEQFILE_OPEN, SEQFILE_READNEXT,
- SEQFILE_APPEND, SEQFILE_CLOSE,
- PARTITION_REQUEST, PARTITION_RESPONSE
+ START_MESSAGE, SET_BSPJOB_CONF, SET_INPUT_TYPES,
+ RUN_SETUP, RUN_BSP, RUN_CLEANUP,
+ READ_KEYVALUE, WRITE_KEYVALUE,
+ GET_MSG, GET_MSG_COUNT,
+ SEND_MSG, SYNC,
+ GET_ALL_PEERNAME, GET_PEERNAME,
+ GET_PEER_INDEX, GET_PEER_COUNT, GET_SUPERSTEP_COUNT,
+ REOPEN_INPUT, CLEAR,
+ CLOSE, ABORT,
+ DONE, TASK_DONE,
+ REGISTER_COUNTER, INCREMENT_COUNTER,
+ SEQFILE_OPEN, SEQFILE_READNEXT,
+ SEQFILE_APPEND, SEQFILE_CLOSE,
+ PARTITION_REQUEST, PARTITION_RESPONSE
};
-
+
/* Only needed for debugging output */
const char* messageTypeNames[] = {
- stringify( START_MESSAGE ), stringify( SET_BSPJOB_CONF ), stringify( SET_INPUT_TYPES ),
- stringify( RUN_SETUP ), stringify( RUN_BSP ), stringify( RUN_CLEANUP ),
- stringify( READ_KEYVALUE ), stringify( WRITE_KEYVALUE ),
- stringify( GET_MSG ), stringify( GET_MSG_COUNT ),
- stringify( SEND_MSG ), stringify( SYNC ),
- stringify( GET_ALL_PEERNAME ), stringify( GET_PEERNAME ),
- stringify( GET_PEER_INDEX ), stringify( GET_PEER_COUNT ), stringify( GET_SUPERSTEP_COUNT ),
- stringify( REOPEN_INPUT ), stringify( CLEAR ),
- stringify( CLOSE ), stringify( ABORT ),
- stringify( DONE ), stringify( TASK_DONE ),
- stringify( REGISTER_COUNTER ), stringify( INCREMENT_COUNTER ),
- stringify( SEQFILE_OPEN ), stringify( SEQFILE_READNEXT ),
- stringify( SEQFILE_APPEND ), stringify( SEQFILE_CLOSE ),
- stringify( PARTITION_REQUEST ), stringify( PARTITION_RESPONSE )
- };
-
+ stringify( START_MESSAGE ), stringify( SET_BSPJOB_CONF ), stringify( SET_INPUT_TYPES ),
+ stringify( RUN_SETUP ), stringify( RUN_BSP ), stringify( RUN_CLEANUP ),
+ stringify( READ_KEYVALUE ), stringify( WRITE_KEYVALUE ),
+ stringify( GET_MSG ), stringify( GET_MSG_COUNT ),
+ stringify( SEND_MSG ), stringify( SYNC ),
+ stringify( GET_ALL_PEERNAME ), stringify( GET_PEERNAME ),
+ stringify( GET_PEER_INDEX ), stringify( GET_PEER_COUNT ), stringify( GET_SUPERSTEP_COUNT ),
+ stringify( REOPEN_INPUT ), stringify( CLEAR ),
+ stringify( CLOSE ), stringify( ABORT ),
+ stringify( DONE ), stringify( TASK_DONE ),
+ stringify( REGISTER_COUNTER ), stringify( INCREMENT_COUNTER ),
+ stringify( SEQFILE_OPEN ), stringify( SEQFILE_READNEXT ),
+ stringify( SEQFILE_APPEND ), stringify( SEQFILE_CLOSE ),
+ stringify( PARTITION_REQUEST ), stringify( PARTITION_RESPONSE )
+ };
+
/********************************************/
- /*********** BinaryUpwardProtocol ***********/
+ /*********** BinaryUpwardProtocol ***********/
/********************************************/
class BinaryUpwardProtocol: public UpwardProtocol {
private:
@@ -194,65 +194,69 @@ namespace HamaPipes {
BinaryUpwardProtocol(FILE* _stream) {
stream = new FileOutStream();
HADOOP_ASSERT(stream->open(_stream), "problem opening stream");
-
+
}
-
+
virtual void sendCMD(int32_t cmd) {
serializeInt(cmd, *stream);
stream->flush();
if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s\n",
- messageTypeNames[cmd]);
+ messageTypeNames[cmd]);
}
-
+
virtual void sendCMD(int32_t cmd, int32_t value) {
serializeInt(cmd, *stream);
serializeInt(value, *stream);
stream->flush();
- if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %d\n",messageTypeNames[cmd],value);
+ if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %d\n",
+ messageTypeNames[cmd],value);
}
-
+
virtual void sendCMD(int32_t cmd, const string& value) {
serializeInt(cmd, *stream);
serializeString(value, *stream);
stream->flush();
- if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %s\n",messageTypeNames[cmd],value.c_str());
+ if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %s\n",
+ messageTypeNames[cmd],value.c_str());
}
-
+
virtual void sendCMD(int32_t cmd, const string values[], int size) {
- serializeInt(cmd, *stream);
- for (int i=0; i<size; i++) {
+ serializeInt(cmd, *stream);
+ for (int i=0; i<size; i++) {
serializeString(values[i], *stream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: %s\n",messageTypeNames[cmd],i+1,values[i].c_str());
+ if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: %s\n",
+ messageTypeNames[cmd],i+1,values[i].c_str());
}
stream->flush();
}
-
+
virtual void sendCMD(int32_t cmd, int32_t value, const string values[], int size) {
serializeInt(cmd, *stream);
serializeInt(value, *stream);
- for (int i=0; i<size; i++) {
+ for (int i=0; i<size; i++) {
serializeString(values[i], *stream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: %s\n",messageTypeNames[cmd],i+1,values[i].c_str());
- }
+ if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: %s\n",
+ messageTypeNames[cmd],i+1,values[i].c_str());
+ }
stream->flush();
}
-
- /*
- virtual void registerCounter(int id, const string& group,
- const string& name) {
- serializeInt(REGISTER_COUNTER, *stream);
- serializeInt(id, *stream);
- serializeString(group, *stream);
- serializeString(name, *stream);
- }
- virtual void incrementCounter(const TaskContext::Counter* counter,
- uint64_t amount) {
- serializeInt(INCREMENT_COUNTER, *stream);
- serializeInt(counter->getId(), *stream);
- serializeLong(amount, *stream);
- }
- */
+ /*
+ virtual void registerCounter(int id, const string& group,
+ const string& name) {
+ serializeInt(REGISTER_COUNTER, *stream);
+ serializeInt(id, *stream);
+ serializeString(group, *stream);
+ serializeString(name, *stream);
+ }
+
+ virtual void incrementCounter(const TaskContext::Counter* counter,
+ uint64_t amount) {
+ serializeInt(INCREMENT_COUNTER, *stream);
+ serializeInt(counter->getId(), *stream);
+ serializeLong(amount, *stream);
+ }
+ */
virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
serializeInt(INCREMENT_COUNTER, *stream);
@@ -262,235 +266,258 @@ namespace HamaPipes {
stream->flush();
if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent incrementCounter\n");
}
-
+
~BinaryUpwardProtocol() {
delete stream;
}
};
-
+
/********************************************/
- /************** BinaryProtocol **************/
+ /************** BinaryProtocol **************/
/********************************************/
class BinaryProtocol: public Protocol {
private:
FileInStream* downStream;
DownwardProtocol* handler;
BinaryUpwardProtocol * uplink;
-
+
string key;
string value;
-
+
public:
BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
downStream = new FileInStream();
downStream->open(down);
uplink = new BinaryUpwardProtocol(up);
handler = _handler;
-
- //authDone = false;
- //getPassword(password);
}
-
+
UpwardProtocol* getUplink() {
return uplink;
}
-
-
+
+
virtual void nextEvent() {
int32_t cmd;
cmd = deserializeInt(*downStream);
-
- switch (cmd) {
-
- case START_MESSAGE: {
- int32_t prot;
- prot = deserializeInt(*downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE prot: %d\n", prot);
- handler->start(prot);
- break;
- }
- /* SET BSP Job Configuration / Environment */
- case SET_BSPJOB_CONF: {
- int32_t entries;
- entries = deserializeInt(*downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n", entries);
- vector<string> result(entries*2);
- for(int i=0; i < entries*2; ++i) {
- string item;
- deserializeString(item, *downStream);
- result.push_back(item);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n", item.c_str());
- }
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n", entries);
- handler->setBSPJob(result);
- break;
- }
- case SET_INPUT_TYPES: {
- string keyType;
- string valueType;
- deserializeString(keyType, *downStream);
- deserializeString(valueType, *downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n",
- keyType.c_str(),valueType.c_str());
- handler->setInputTypes(keyType, valueType);
- break;
- }
- case READ_KEYVALUE: {
- deserializeString(key, *downStream);
- deserializeString(value, *downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got READ_KEYVALUE key: %s value: %s\n",
- key.c_str(),
- ((value.length()<10)?value.c_str():value.substr(0,9).c_str()) );
- handler->setKeyValue(key, value);
- break;
- }
- case RUN_SETUP: {
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n");
- int32_t pipedInput;
- int32_t pipedOutput;
- pipedInput = deserializeInt(*downStream);
- pipedOutput = deserializeInt(*downStream);
- handler->runSetup(pipedInput, pipedOutput);
- break;
- }
- case RUN_BSP: {
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n");
- int32_t pipedInput;
- int32_t pipedOutput;
- pipedInput = deserializeInt(*downStream);
- pipedOutput = deserializeInt(*downStream);
- handler->runBsp(pipedInput, pipedOutput);
- break;
- }
- case RUN_CLEANUP: {
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n");
- int32_t pipedInput;
- int32_t pipedOutput;
- pipedInput = deserializeInt(*downStream);
- pipedOutput = deserializeInt(*downStream);
- handler->runCleanup(pipedInput, pipedOutput);
- break;
- }
-
- case PARTITION_REQUEST: {
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n");
- string partionKey;
- string partionValue;
- int32_t numTasks;
- deserializeString(partionKey, *downStream);
- deserializeString(partionValue, *downStream);
- numTasks = deserializeInt(*downStream);
- handler->runPartition(partionKey, partionValue, numTasks);
- break;
- }
-
-
- case GET_MSG_COUNT: {
- int32_t msgCount = deserializeInt(*downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG_COUNT msgCount: %d\n",msgCount);
- handler->setNewResult(msgCount);
- break;
- }
- case GET_MSG: {
- string msg;
- deserializeString(msg,*downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG msg: %s\n",msg.c_str());
- handler->setNewResult(msg);
- break;
- }
- case GET_PEERNAME: {
- string peername;
- deserializeString(peername,*downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEERNAME peername: %s\n",peername.c_str());
- handler->setNewResult(peername);
- break;
- }
- case GET_ALL_PEERNAME: {
- vector<string> peernames;
- int32_t peernameCount = deserializeInt(*downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peernameCount: %d\n",peernameCount);
- string peername;
- for (int i=0; i<peernameCount; i++) {
+
+ switch (cmd) {
+
+ case START_MESSAGE: {
+ int32_t prot;
+ prot = deserializeInt(*downStream);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE prot: %d\n", prot);
+ handler->start(prot);
+ break;
+ }
+ /* SET BSP Job Configuration / Environment */
+ case SET_BSPJOB_CONF: {
+ int32_t entries;
+ entries = deserializeInt(*downStream);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n",
+ entries);
+ vector<string> result(entries*2);
+ for(int i=0; i < entries*2; ++i) {
+ string item;
+ deserializeString(item, *downStream);
+ result.push_back(item);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n",
+ item.c_str());
+ }
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n",
+ entries);
+ handler->setBSPJob(result);
+ break;
+ }
+ case SET_INPUT_TYPES: {
+ string keyType;
+ string valueType;
+ deserializeString(keyType, *downStream);
+ deserializeString(valueType, *downStream);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n",
+ keyType.c_str(),valueType.c_str());
+ handler->setInputTypes(keyType, valueType);
+ break;
+ }
+ case READ_KEYVALUE: {
+ deserializeString(key, *downStream);
+ deserializeString(value, *downStream);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got READ_KEYVALUE key: %s value: %s\n",
+ key.c_str(),((value.length()<10)?value.c_str():value.substr(0,9).c_str()));
+ handler->setKeyValue(key, value);
+ break;
+ }
+ case RUN_SETUP: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n");
+ int32_t pipedInput;
+ int32_t pipedOutput;
+ pipedInput = deserializeInt(*downStream);
+ pipedOutput = deserializeInt(*downStream);
+ handler->runSetup(pipedInput, pipedOutput);
+ break;
+ }
+ case RUN_BSP: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n");
+ int32_t pipedInput;
+ int32_t pipedOutput;
+ pipedInput = deserializeInt(*downStream);
+ pipedOutput = deserializeInt(*downStream);
+ handler->runBsp(pipedInput, pipedOutput);
+ break;
+ }
+ case RUN_CLEANUP: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n");
+ int32_t pipedInput;
+ int32_t pipedOutput;
+ pipedInput = deserializeInt(*downStream);
+ pipedOutput = deserializeInt(*downStream);
+ handler->runCleanup(pipedInput, pipedOutput);
+ break;
+ }
+
+ case PARTITION_REQUEST: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n");
+ string partionKey;
+ string partionValue;
+ int32_t numTasks;
+ deserializeString(partionKey, *downStream);
+ deserializeString(partionValue, *downStream);
+ numTasks = deserializeInt(*downStream);
+ handler->runPartition(partionKey, partionValue, numTasks);
+ break;
+ }
+
+ case GET_MSG_COUNT: {
+ int32_t msgCount = deserializeInt(*downStream);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG_COUNT msgCount: %d\n",
+ msgCount);
+ handler->setNewResult(msgCount);
+ break;
+ }
+ case GET_MSG: {
+ string msg;
+ deserializeString(msg,*downStream);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG msg: %s\n",
+ msg.c_str());
+ handler->setNewResult(msg);
+ break;
+ }
+ case GET_PEERNAME: {
+ string peername;
deserializeString(peername,*downStream);
- peernames.push_back(peername);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername: %s\n",peername.c_str());
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEERNAME peername: %s\n",
+ peername.c_str());
+ handler->setNewResult(peername);
+ break;
}
- handler->setNewResult(peernames);
- break;
- }
- case GET_PEER_INDEX: {
- int32_t peerIndex = deserializeInt(*downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_INDEX peerIndex: %d\n",peerIndex);
- handler->setNewResult(peerIndex);
- break;
- }
- case GET_PEER_COUNT: {
- int32_t peerCount = deserializeInt(*downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_COUNT peerCount: %d\n",peerCount);
- handler->setNewResult(peerCount);
- break;
- }
- case GET_SUPERSTEP_COUNT: {
- int64_t superstepCount = deserializeLong(*downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_SUPERSTEP_COUNT superstepCount: %ld\n",(long)superstepCount);
- handler->setNewResult(superstepCount);
- break;
- }
-
-
- case SEQFILE_OPEN: {
- int32_t fileID = deserializeInt(*downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_OPEN fileID: %d\n",fileID);
- handler->setNewResult(fileID);
- break;
- }
- case SEQFILE_READNEXT: {
- deserializeString(key, *downStream);
- deserializeString(value, *downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_READNEXT key: %s value: %s\n",
- key.c_str(),
- ((value.length()<10)?value.c_str():value.substr(0,9).c_str()) );
- handler->setKeyValue(key, value);
- break;
- }
- case SEQFILE_APPEND: {
+ case GET_ALL_PEERNAME: {
+ vector<string> peernames;
+ int32_t peernameCount = deserializeInt(*downStream);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peernameCount: %d\n",
+ peernameCount);
+ string peername;
+ for (int i=0; i<peernameCount; i++) {
+ deserializeString(peername,*downStream);
+ peernames.push_back(peername);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername: %s\n",
+ peername.c_str());
+ }
+ handler->setNewResult(peernames);
+ break;
+ }
+ case GET_PEER_INDEX: {
+ int32_t peerIndex = deserializeInt(*downStream);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_INDEX peerIndex: %d\n",
+ peerIndex);
+ handler->setNewResult(peerIndex);
+ break;
+ }
+ case GET_PEER_COUNT: {
+ int32_t peerCount = deserializeInt(*downStream);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_COUNT peerCount: %d\n",
+ peerCount);
+ handler->setNewResult(peerCount);
+ break;
+ }
+ case GET_SUPERSTEP_COUNT: {
+ int64_t superstepCount = deserializeLong(*downStream);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_SUPERSTEP_COUNT superstepCount: %ld\n",
+ (long)superstepCount);
+ handler->setNewResult(superstepCount);
+ break;
+ }
+
+
+ case SEQFILE_OPEN: {
+ int32_t fileID = deserializeInt(*downStream);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_OPEN fileID: %d\n",fileID);
+ handler->setNewResult(fileID);
+ break;
+ }
+ case SEQFILE_READNEXT: {
+ deserializeString(key, *downStream);
+ deserializeString(value, *downStream);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_READNEXT key: %s value: %s\n",
+ key.c_str(),((value.length()<10)?value.c_str():value.substr(0,9).c_str()));
+ handler->setKeyValue(key, value);
+ break;
+ }
+ case SEQFILE_APPEND: {
int32_t result = deserializeInt(*downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_APPEND result: %d\n",result);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_APPEND result: %d\n",result);
handler->setNewResult(result);
break;
- }
- case SEQFILE_CLOSE: {
+ }
+ case SEQFILE_CLOSE: {
int32_t result = deserializeInt(*downStream);
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_CLOSE result: %d\n",result);
+ if(logging)
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_CLOSE result: %d\n",result);
handler->setNewResult(result);
break;
+ }
+
+
+ case CLOSE: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n");
+ handler->close();
+ break;
+ }
+ case ABORT: {
+ if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n");
+ handler->abort();
+ break;
+ }
+ default:
+ HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
+ fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",cmd);
}
-
-
- case CLOSE: {
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n");
- handler->close();
- break;
- }
- case ABORT: {
- if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n");
- handler->abort();
- break;
- }
- default:
- HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
- fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",cmd);
- }
- }
-
+ }
+
virtual ~BinaryProtocol() {
delete downStream;
delete uplink;
}
};
-
+
/********************************************/
- /************** BSPContextImpl **************/
+ /************** BSPContextImpl **************/
/********************************************/
class BSPContextImpl: public BSPContext, public DownwardProtocol {
private:
@@ -509,7 +536,7 @@ namespace HamaPipes {
//float progressFloat;
//uint64_t lastProgress;
//bool statusSet;
-
+
Protocol* protocol;
UpwardProtocol *uplink;
@@ -517,36 +544,36 @@ namespace HamaPipes {
RecordReader* reader;
RecordWriter* writer;
-
+
BSP* bsp;
Partitioner* partitioner;
const Factory* factory;
pthread_mutex_t mutexDone;
std::vector<int> registeredCounterIds;
-
+
int32_t resultInt;
- bool isNewResultInt;
+ bool isNewResultInt;
int64_t resultLong;
- bool isNewResultLong;
+ bool isNewResultLong;
string resultString;
- bool isNewResultString;
+ bool isNewResultString;
vector<string> resultVector;
- bool isNewResultVector;
+ bool isNewResultVector;
- bool isNewKeyValuePair;
+ bool isNewKeyValuePair;
string currentKey;
string currentValue;
-
+
public:
-
+
BSPContextImpl(const Factory& _factory) {
//statusSet = false;
done = false;
//newKey = NULL;
factory = &_factory;
job = NULL;
-
+
inputKeyClass = NULL;
inputValueClass = NULL;
@@ -563,26 +590,26 @@ namespace HamaPipes {
//progressFloat = 0.0f;
hasTask = false;
pthread_mutex_init(&mutexDone, NULL);
-
+
isNewResultInt = false;
isNewResultString = false,
isNewResultVector = false;
-
+
isNewKeyValuePair = false;
}
-
-
+
+
/********************************************/
- /*********** DownwardProtocol IMPL **********/
+ /*********** DownwardProtocol IMPL **********/
/********************************************/
virtual void start(int protocol) {
if (protocol != 0) {
- throw Error("Protocol version " + toString(protocol) +
+ throw Error("Protocol version " + toString(protocol) +
" not supported");
}
partitioner = factory->createPartitioner(*this);
}
-
+
virtual void setBSPJob(vector<string> values) {
int len = values.size();
BSPJobImpl* result = new BSPJobImpl();
@@ -592,24 +619,25 @@ namespace HamaPipes {
}
job = result;
}
-
+
virtual void setInputTypes(string keyType, string valueType) {
inputKeyClass = new string(keyType);
inputValueClass = new string(valueType);
}
-
+
virtual void setKeyValue(const string& _key, const string& _value) {
currentKey = _key;
currentValue = _value;
isNewKeyValuePair = true;
}
-
+
/* private Method */
void setupReaderWriter(bool pipedInput, bool pipedOutput) {
-
- if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::setupReaderWriter - pipedInput: %s pipedOutput: %s\n",
- (pipedInput)?"true":"false",(pipedOutput)?"true":"false");
-
+
+ if(logging)
+ fprintf(stderr,"HamaPipes::BSPContextImpl::setupReaderWriter - pipedInput: %s pipedOutput: %s\n",
+ (pipedInput)?"true":"false",(pipedOutput)?"true":"false");
+
if (pipedInput && reader==NULL) {
reader = factory->createRecordReader(*this);
HADOOP_ASSERT((reader == NULL) == pipedInput,
@@ -617,10 +645,10 @@ namespace HamaPipes {
"RecordReader not defined");
//if (reader != NULL) {
- // value = new string();
+ // value = new string();
//}
- }
-
+ }
+
if (pipedOutput && writer==NULL) {
writer = factory->createRecordWriter(*this);
HADOOP_ASSERT((writer == NULL) == pipedOutput,
@@ -628,13 +656,13 @@ namespace HamaPipes {
"RecordWriter not defined");
}
}
-
+
virtual void runSetup(bool pipedInput, bool pipedOutput) {
setupReaderWriter(pipedInput,pipedOutput);
- if (bsp == NULL)
+ if (bsp == NULL)
bsp = factory->createBSP(*this);
-
+
if (bsp != NULL) {
hasTask = true;
bsp->setup(*this);
@@ -642,13 +670,13 @@ namespace HamaPipes {
uplink->sendCMD(TASK_DONE);
}
}
-
+
virtual void runBsp(bool pipedInput, bool pipedOutput) {
setupReaderWriter(pipedInput,pipedOutput);
-
- if (bsp == NULL)
- bsp = factory->createBSP(*this);
-
+
+ if (bsp == NULL)
+ bsp = factory->createBSP(*this);
+
if (bsp != NULL) {
hasTask = true;
bsp->bsp(*this);
@@ -656,10 +684,10 @@ namespace HamaPipes {
uplink->sendCMD(TASK_DONE);
}
}
-
+
virtual void runCleanup(bool pipedInput, bool pipedOutput) {
setupReaderWriter(pipedInput,pipedOutput);
-
+
if (bsp != NULL) {
hasTask = true;
bsp->cleanup(*this);
@@ -667,54 +695,56 @@ namespace HamaPipes {
uplink->sendCMD(TASK_DONE);
}
}
-
+
+ /********************************************/
+ /******* Partitioner *******/
/********************************************/
- /******* Partitioner *******/
- /********************************************/
virtual void runPartition(const string& key, const string& value, int32_t numTasks){
- if (partitioner != NULL) {
+ if (partitioner != NULL) {
int part = partitioner->partition(key, value, numTasks);
uplink->sendCMD(PARTITION_RESPONSE, part);
} else {
- if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n");
+ if(logging)
+ fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n");
}
- }
-
+ }
+
virtual void setNewResult(int32_t _value) {
resultInt = _value;
- isNewResultInt = true;
+ isNewResultInt = true;
}
-
+
virtual void setNewResult(int64_t _value) {
resultLong = _value;
- isNewResultLong = true;
+ isNewResultLong = true;
}
-
+
virtual void setNewResult(const string& _value) {
resultString = _value;
- isNewResultString = true;
+ isNewResultString = true;
}
-
+
virtual void setNewResult(vector<string> _value) {
resultVector = _value;
- isNewResultVector = true;
+ isNewResultVector = true;
}
-
+
virtual void close() {
pthread_mutex_lock(&mutexDone);
done = true;
hasTask = false;
pthread_mutex_unlock(&mutexDone);
- if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::close - done: %s hasTask: %s\n",
+ if(logging)
+ fprintf(stderr,"HamaPipes::BSPContextImpl::close - done: %s hasTask: %s\n",
(done)?"true":"false",(hasTask)?"true":"false");
}
-
+
virtual void abort() {
throw Error("Aborted by driver");
}
-
+
/********************************************/
- /************** TaskContext IMPL ************/
+ /************** TaskContext IMPL ************/
/********************************************/
/**
@@ -723,78 +753,78 @@ namespace HamaPipes {
virtual const BSPJob* getBSPJob() {
return job;
}
-
+
/**
- * Get the current key.
+ * Get the current key.
* @return the current key or NULL if called before the first map or reduce
*/
//virtual const string& getInputKey() {
// return key;
//}
-
+
/**
- * Get the current value.
- * @return the current value or NULL if called before the first map or
+ * Get the current value.
+ * @return the current value or NULL if called before the first map or
* reduce
*/
//virtual const string& getInputValue() {
// return *value;
//}
-
+
/**
* Register a counter with the given group and name.
*/
/*
- virtual Counter* getCounter(const std::string& group,
- const std::string& name) {
- int id = registeredCounterIds.size();
- registeredCounterIds.push_back(id);
- uplink->registerCounter(id, group, name);
- return new Counter(id);
- }*/
-
+ virtual Counter* getCounter(const std::string& group,
+ const std::string& name) {
+ int id = registeredCounterIds.size();
+ registeredCounterIds.push_back(id);
+ uplink->registerCounter(id, group, name);
+ return new Counter(id);
+ }*/
+
/**
* Increment the value of the counter with the given amount.
*/
virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
- uplink->incrementCounter(group, name, amount);
+ uplink->incrementCounter(group, name, amount);
}
-
+
/********************************************/
- /************** BSPContext IMPL *************/
+ /************** BSPContext IMPL *************/
/********************************************/
-
+
/**
* Access the InputSplit of the bsp.
*/
//virtual const string& getInputSplit() {
// return *inputSplit;
//}
-
+
/**
* Get the name of the key class of the input to this task.
*/
virtual const string& getInputKeyClass() {
return *inputKeyClass;
}
-
+
/**
* Get the name of the value class of the input to this task.
*/
virtual const string& getInputValueClass() {
return *inputValueClass;
}
-
+
/**
* Send a data with a tag to another BSPSlave corresponding to hostname.
* Messages sent by this method are not guaranteed to be received in a sent
* order.
*/
virtual void sendMessage(const string& peerName, const string& msg) {
- string values[] = {peerName, msg};
- uplink->sendCMD(SEND_MSG,values, 2);
+ string values[] = {peerName, msg};
+ uplink->sendCMD(SEND_MSG,values, 2);
}
-
+
/**
* @return A message from the peer's received messages queue (a FIFO).
*/
@@ -802,29 +832,31 @@ namespace HamaPipes {
uplink->sendCMD(GET_MSG);
while (!isNewResultString)
- protocol->nextEvent();
-
+ protocol->nextEvent();
+
isNewResultString = false;
- if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - NewResultString: %s\n",resultString.c_str());
+ if(logging)
+ fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - NewResultString: %s\n",
+ resultString.c_str());
return resultString;
}
-
+
/**
* @return The number of messages in the peer's received messages queue.
*/
virtual int getNumCurrentMessages() {
uplink->sendCMD(GET_MSG_COUNT);
-
+
while (!isNewResultInt)
protocol->nextEvent();
isNewResultInt = false;
return resultInt;
}
-
+
/**
* Barrier Synchronization.
- *
+ *
* Sends all the messages in the outgoing message queues to the corresponding
* remote peers.
*/
@@ -834,14 +866,16 @@ namespace HamaPipes {
/**
* @return the name of this peer in the format "hostname:port".
- */
+ */
virtual const string& getPeerName() {
uplink->sendCMD(GET_PEERNAME,-1);
-
+
while (!isNewResultString)
protocol->nextEvent();
-
- if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",resultString.c_str());
+
+ if(logging)
+ fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",
+ resultString.c_str());
isNewResultString = false;
return resultString;
}
@@ -851,11 +885,13 @@ namespace HamaPipes {
*/
virtual const string& getPeerName(int index) {
uplink->sendCMD(GET_PEERNAME,index);
-
+
while (!isNewResultString)
protocol->nextEvent();
-
- if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",resultString.c_str());
+
+ if(logging)
+ fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",
+ resultString.c_str());
isNewResultString = false;
return resultString;
}
@@ -866,10 +902,10 @@ namespace HamaPipes {
*/
virtual vector<string> getAllPeerNames() {
uplink->sendCMD(GET_ALL_PEERNAME);
-
+
while (!isNewResultVector)
protocol->nextEvent();
-
+
isNewResultVector = false;
return resultVector;
}
@@ -879,39 +915,39 @@ namespace HamaPipes {
*/
virtual int getPeerIndex() {
uplink->sendCMD(GET_PEER_INDEX);
-
+
while (!isNewResultInt)
protocol->nextEvent();
-
+
isNewResultInt = false;
return resultInt;
}
-
+
/**
* @return the number of peers
*/
virtual int getNumPeers() {
uplink->sendCMD(GET_PEER_COUNT);
-
+
while (!isNewResultInt)
protocol->nextEvent();
-
+
isNewResultInt = false;
- return resultInt;
+ return resultInt;
}
-
+
/**
* @return the count of current super-step
*/
virtual long getSuperstepCount() {
uplink->sendCMD(GET_SUPERSTEP_COUNT);
-
+
while (!isNewResultLong)
protocol->nextEvent();
-
+
isNewResultLong = false;
- return resultLong;
- }
+ return resultLong;
+ }
/**
* Clears all queues entries.
@@ -919,17 +955,17 @@ namespace HamaPipes {
virtual void clear() {
uplink->sendCMD(CLEAR);
}
-
+
/**
* Writes a key/value pair to the output collector
*/
virtual void write(const string& key, const string& value) {
- if (writer != NULL) {
- writer->emit(key, value);
- } else {
- string values[] = {key, value};
- uplink->sendCMD(WRITE_KEYVALUE, values, 2);
- }
+ if (writer != NULL) {
+ writer->emit(key, value);
+ } else {
+ string values[] = {key, value};
+ uplink->sendCMD(WRITE_KEYVALUE, values, 2);
+ }
}
/**
@@ -937,21 +973,30 @@ namespace HamaPipes {
*/
virtual bool readNext(string& _key, string& _value) {
uplink->sendCMD(READ_KEYVALUE);
-
+
while (!isNewKeyValuePair)
protocol->nextEvent();
isNewKeyValuePair = false;
-
+
_key = currentKey;
- _value = currentValue;
- if (logging && _key.empty() && _value.empty())
+ // check if value is array [0, 1, 2, ...], and remove brackets
+ int len = currentValue.length();
+ if ( (currentValue[0]=='[') &&
+ (currentValue[len-1]==']') ) {
+ _value = currentValue.substr(1,len-2);
+ } else {
+ _value = currentValue;
+ }
+
+ if (logging && _key.empty() && _value.empty()) {
fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - Empty KeyValuePair\n");
-
+ }
+
return (!_key.empty() && !_value.empty());
}
-
+
/**
* Closes the input and opens it right away, so that the file pointer is at
* the beginning again.
@@ -959,154 +1004,163 @@ namespace HamaPipes {
virtual void reopenInput() {
uplink->sendCMD(REOPEN_INPUT);
}
-
-
+
+
/********************************************/
- /******* SequenceFileConnector IMPL *******/
- /********************************************/
-
+ /******* SequenceFileConnector IMPL *******/
+ /********************************************/
+
/**
* Open SequenceFile with opion "r" or "w"
* @return the corresponding fileID
*/
- virtual int sequenceFileOpen(const string& path, const string& option,
+ virtual int sequenceFileOpen(const string& path, const string& option,
const string& keyType, const string& valueType) {
- if (logging)
- fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen - Path: %s\n",path.c_str());
-
- if ( (option.compare("r")==0) || (option.compare("w")==0)) {
-
- string values[] = {path, option, keyType, valueType};
- uplink->sendCMD(SEQFILE_OPEN,values, 4);
+ if (logging)
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen - Path: %s\n",
+ path.c_str());
- while (!isNewResultInt)
- protocol->nextEvent();
+ if ( (option.compare("r")==0) || (option.compare("w")==0)) {
+
+ string values[] = {path, option, keyType, valueType};
+ uplink->sendCMD(SEQFILE_OPEN,values, 4);
+
+ while (!isNewResultInt)
+ protocol->nextEvent();
- isNewResultInt = false;
- return resultInt;
- } else {
- fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen wrong option: %s!\n",option.c_str());
- return -1; //Error wrong option
+ isNewResultInt = false;
+ return resultInt;
+ } else {
+ fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen wrong option: %s!\n",
+ option.c_str());
+ return -1; //Error wrong option
}
}
-
+
/**
* Read next key/value pair from the SequenceFile with fileID
*/
virtual bool sequenceFileReadNext(int fileID, string& _key, string& _value) {
-
+
uplink->sendCMD(SEQFILE_READNEXT,fileID);
-
+
while (!isNewKeyValuePair)
protocol->nextEvent();
-
+
isNewKeyValuePair = false;
-
+
_key = currentKey;
- _value = currentValue;
-
- if (logging && _key.empty() && _value.empty())
+ // check if value is array [0, 1, 2, ...], and remove brackets
+ int len = currentValue.length();
+ if ( (currentValue[0]=='[') &&
+ (currentValue[len-1]==']') ) {
+ _value = currentValue.substr(1,len-2);
+ } else {
+ _value = currentValue;
+ }
+
+ if (logging && _key.empty() && _value.empty())
fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileReadNext - Empty KeyValuePair\n");
-
+
return (!_key.empty() && !_value.empty());
}
-
+
/**
* Append the next key/value pair to the SequenceFile with fileID
*/
virtual bool sequenceFileAppend(int fileID, const string& key, const string& value) {
string values[] = {key, value};
uplink->sendCMD(SEQFILE_APPEND,fileID, values, 2);
-
+
while (!isNewResultInt)
protocol->nextEvent();
-
+
isNewResultInt = false;
return (resultInt==1);
}
-
+
/**
* Close SequenceFile
*/
virtual bool sequenceFileClose(int fileID) {
uplink->sendCMD(SEQFILE_CLOSE,fileID);
-
+
while (!isNewResultInt)
protocol->nextEvent();
-
- if (logging && resultInt==0)
+
+ if (logging && resultInt==0)
fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - Nothing was closed!\n");
else if (logging)
fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - File was successfully closed!\n");
-
+
isNewResultInt = false;
return (resultInt==1);
}
-
+
/********************************************/
- /*************** Other STUFF ***************/
+ /*************** Other STUFF ***************/
/********************************************/
-
+
void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) {
- protocol = _protocol;
- uplink = _uplink;
+ protocol = _protocol;
+ uplink = _uplink;
}
-
+
bool isDone() {
- pthread_mutex_lock(&mutexDone);
- bool doneCopy = done;
- pthread_mutex_unlock(&mutexDone);
- return doneCopy;
+ pthread_mutex_lock(&mutexDone);
+ bool doneCopy = done;
+ pthread_mutex_unlock(&mutexDone);
+ return doneCopy;
}
-
+
/**
* Advance to the next value.
*/
/*
- bool nextValue() {
- if (isNewKey || done) {
- return false;
- }
- isNewValue = false;
- //progress();
- protocol->nextEvent();
- return isNewValue;
- }
- */
+ bool nextValue() {
+ if (isNewKey || done) {
+ return false;
+ }
+ isNewValue = false;
+ //progress();
+ protocol->nextEvent();
+ return isNewValue;
+ }
+ */
void waitForTask() {
- while (!done && !hasTask) {
- if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::waitForTask - done: %s hasTask: %s\n",
- (done)?"true":"false",(hasTask)?"true":"false");
- protocol->nextEvent();
- }
+ while (!done && !hasTask) {
+ if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::waitForTask - done: %s hasTask: %s\n",
+ (done)?"true":"false",(hasTask)?"true":"false");
+ protocol->nextEvent();
+ }
}
/*
- bool nextKey() {
- if (reader == NULL) {
- while (!isNewKey) {
- nextValue();
- if (done) {
- return false;
- }
- }
- key = *newKey;
- } else {
- if (!reader->next(key, const_cast<string&>(*value))) {
- pthread_mutex_lock(&mutexDone);
- done = true;
- pthread_mutex_unlock(&mutexDone);
- return false;
- }
- //progressFloat = reader->getProgress();
- }
- isNewKey = false;
-
- if (bsp != NULL) {
- bsp->bsp(*this);
- }
- return true;
- }
- */
+ bool nextKey() {
+ if (reader == NULL) {
+ while (!isNewKey) {
+ nextValue();
+ if (done) {
+ return false;
+ }
+ }
+ key = *newKey;
+ } else {
+ if (!reader->next(key, const_cast<string&>(*value))) {
+ pthread_mutex_lock(&mutexDone);
+ done = true;
+ pthread_mutex_unlock(&mutexDone);
+ return false;
+ }
+ //progressFloat = reader->getProgress();
+ }
+ isNewKey = false;
+
+ if (bsp != NULL) {
+ bsp->bsp(*this);
+ }
+ return true;
+ }
+ */
void closeAll() {
if (reader) {
reader->close();
@@ -1115,12 +1169,12 @@ namespace HamaPipes {
if (bsp) {
bsp->close();
}
-
+
if (writer) {
writer->close();
}
}
-
+
virtual ~BSPContextImpl() {
delete job;
delete inputKeyClass;
@@ -1135,9 +1189,9 @@ namespace HamaPipes {
pthread_mutex_destroy(&mutexDone);
}
};
-
+
/**
- * Ping the parent every 5 seconds to know if it is alive
+ * Ping the parent every 5 seconds to know if it is alive
*/
void* ping(void* ptr) {
BSPContextImpl* context = (BSPContextImpl*) ptr;
@@ -1156,11 +1210,13 @@ namespace HamaPipes {
addr.sin_family = AF_INET;
addr.sin_port = htons(toInt(portStr));
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
- if(logging)fprintf(stderr,"HamaPipes::ping - connected to GroomServer Port: %s\n", portStr);
+ if(logging)
+ fprintf(stderr,"HamaPipes::ping - connected to GroomServer Port: %s\n",
+ portStr);
HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
string("problem connecting command socket: ") +
strerror(errno));
-
+
}
if (sock != -1) {
int result = shutdown(sock, SHUT_RDWR);
@@ -1171,8 +1227,8 @@ namespace HamaPipes {
remaining_retries = MAX_RETRIES;
} catch (Error& err) {
if (!context->isDone()) {
- fprintf(stderr, "Hama Pipes Exception: in ping %s\n",
- err.getMessage().c_str());
+ fprintf(stderr, "Hama Pipes Exception: in ping %s\n",
+ err.getMessage().c_str());
remaining_retries -= 1;
if (remaining_retries == 0) {
exit(1);
@@ -1184,23 +1240,25 @@ namespace HamaPipes {
}
return NULL;
}
-
+
/**
* Run the assigned task in the framework.
- * The user's main function should set the various functions using the
+ * The user's main function should set the various functions using the
* set* functions above and then call this.
* @return true, if the task succeeded.
*/
bool runTask(const Factory& factory) {
try {
HADOOP_ASSERT(getenv("hama.pipes.logging")!=NULL,"No environment found!");
-
- logging = (toInt(getenv("hama.pipes.logging"))==0)?false:true;
- if(logging)fprintf(stderr,"HamaPipes::runTask - logging is: %s\n", (logging)?"true":"false");
-
+
+ logging = (toInt(getenv("hama.pipes.logging"))==0)?false:true;
+ if(logging)
+ fprintf(stderr,"HamaPipes::runTask - logging is: %s\n",
+ ((logging)?"true":"false"));
+
BSPContextImpl* context = new BSPContextImpl(factory);
Protocol* connection;
-
+
char* portStr = getenv("hama.pipes.command.port");
int sock = -1;
FILE* stream = NULL;
@@ -1218,10 +1276,10 @@ namespace HamaPipes {
HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
string("problem connecting command socket: ") +
strerror(errno));
-
+
stream = fdopen(sock, "r");
outStream = fdopen(sock, "w");
-
+
// increase buffer size
int bufsize = 128*1024;
int setbuf;
@@ -1229,14 +1287,16 @@ namespace HamaPipes {
bufout = new char[bufsize];
setbuf = setvbuf(stream, bufin, _IOFBF, bufsize);
HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ")
- + strerror(errno));
+ + strerror(errno));
setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
- + strerror(errno));
-
+ + strerror(errno));
+
connection = new BinaryProtocol(stream, context, outStream);
- if(logging)fprintf(stderr,"HamaPipes::runTask - connected to GroomServer Port: %s\n", portStr);
-
+ if(logging)
+ fprintf(stderr,"HamaPipes::runTask - connected to GroomServer Port: %s\n",
+ portStr);
+
} else if (getenv("hama.pipes.command.file")) {
char* filename = getenv("hama.pipes.command.file");
string outFilename = filename;
@@ -1249,18 +1309,18 @@ namespace HamaPipes {
fprintf(stderr,"HamaPipes::runTask - Connection couldn't be initialized!\n");
return -1;
}
-
+
context->setProtocol(connection, connection->getUplink());
-
+
//pthread_t pingThread;
//pthread_create(&pingThread, NULL, ping, (void*)(context));
context->waitForTask();
-
+
//while (!context->isDone()) {
- //context->nextKey();
+ //context->nextKey();
//}
-
+
context->closeAll();
connection->getUplink()->sendCMD(DONE);
@@ -1286,12 +1346,12 @@ namespace HamaPipes {
}
if (outStream != NULL) {
//fclose(outStream);
- }
+ }
delete bufin;
delete bufout;
return true;
} catch (Error& err) {
- fprintf(stderr, "Hama Pipes Exception: %s\n",
+ fprintf(stderr, "Hama Pipes Exception: %s\n",
err.getMessage().c_str());
return false;
}
Added: hama/trunk/c++/src/main/native/utils/impl/Splitter.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/utils/impl/Splitter.cc?rev=1527466&view=auto
==============================================================================
--- hama/trunk/c++/src/main/native/utils/impl/Splitter.cc (added)
+++ hama/trunk/c++/src/main/native/utils/impl/Splitter.cc Mon Sep 30 07:26:15 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "hadoop/Splitter.hh"
+
+#include <string>
+#include <vector>
+
+namespace HadoopUtils {
+
+
+ Splitter::Splitter ( const std::string& src, const std::string& delim ) {
+ reset ( src, delim );
+ }
+
+ std::string& Splitter::operator[] ( size_type i ) {
+ return _tokens.at ( i );
+ }
+
+ Splitter::size_type Splitter::size() {
+ return _tokens.size();
+ }
+
+ void Splitter::reset ( const std::string& src, const std::string& delim ) {
+ std::vector<std::string> tokens;
+ std::string::size_type start = 0;
+ std::string::size_type end;
+
+ for ( ; ; ) {
+ end = src.find ( delim, start );
+ tokens.push_back ( src.substr ( start, end - start ) );
+
+ // We just copied the last token
+ if ( end == std::string::npos )
+ break;
+
+ // Exclude the delimiter in the next search
+ start = end + delim.size();
+ }
+
+ _tokens.swap ( tokens );
+ }
+
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java?rev=1527466&r1=1527465&r2=1527466&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java Mon Sep 30 07:26:15 2013
@@ -213,9 +213,6 @@ public class PipesApplication<K1 extends
LOG.debug("DEBUG: cmd: " + cmd);
process = runClient(cmd, env); // fork c++ binary
- LOG.debug("DEBUG: waiting for Client at "
- + serverSocket.getLocalSocketAddress());
-
try {
if (!streamingEnabled) {
LOG.debug("DEBUG: waiting for Client at "
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java?rev=1527466&r1=1527465&r2=1527466&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java Mon Sep 30 07:26:15 2013
@@ -35,8 +35,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.pipes.protocol.UplinkReader;
-import org.apache.hama.pipes.protocol.StreamingProtocol.StreamingUplinkReaderThread;
import org.apache.hama.util.KeyValuePair;
/**
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java?rev=1527466&r1=1527465&r2=1527466&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java Mon Sep 30 07:26:15 2013
@@ -315,10 +315,10 @@ public class UplinkReader<K1 extends Wri
WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code);
if (pair != null) {
- binProtocol.writeObject(pair.getKey());
- binProtocol.writeObject(pair.getValue());
-
+ binProtocol.writeObject(new Text(pair.getKey().toString()));
String valueStr = pair.getValue().toString();
+ binProtocol.writeObject(new Text(valueStr));
+
LOG.debug("Responded MessageType.READ_KEYVALUE - Key: "
+ pair.getKey()
+ " Value: "