You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by mi...@apache.org on 2013/12/13 11:19:47 UTC
svn commit: r1550674 - in /hama/trunk: ./ c++/ c++/src/main/native/examples/
c++/src/main/native/pipes/api/hama/ c++/src/main/native/pipes/impl/
core/src/main/java/org/apache/hama/pipes/
core/src/main/java/org/apache/hama/pipes/protocol/ core/src/main/...
Author: millecker
Date: Fri Dec 13 10:19:46 2013
New Revision: 1550674
URL: http://svn.apache.org/r1550674
Log:
HAMA-829: Improve code and fix Javadoc warnings in org.apache.hama.pipes
Modified:
hama/trunk/CHANGES.txt
hama/trunk/c++/pom.xml
hama/trunk/c++/src/main/native/examples/README.txt
hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh
hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Dec 13 10:19:46 2013
@@ -17,6 +17,7 @@ Release 0.7.0 (unreleased changes)
IMPROVEMENTS
+ HAMA-829: Improve code and fix Javadoc warnings in org.apache.hama.pipes (Martin Illecker)
HAMA-808: Hama Pipes Testcase (Martin Illecker)
HAMA-828: Improve code, fix typo and modify unclear comment in org.apache.hama.ml.ann package (Yexi Jiang)
HAMA-699: Add commons module (Martin Illecker)
Modified: hama/trunk/c++/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pom.xml?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/c++/pom.xml (original)
+++ hama/trunk/c++/pom.xml Fri Dec 13 10:19:46 2013
@@ -79,7 +79,7 @@
</else>
</if>
</target>
- <exportAntProperties>true</exportAntProperties>
+ <exportAntProperties>true</exportAntProperties>
</configuration>
</execution>
<!-- TODO wire here native testcases
@@ -103,7 +103,7 @@
</dependency>
</dependencies>
</plugin>
- <plugin>
+ <plugin>
<groupId>org.codehaus.groovy.maven</groupId>
<artifactId>gmaven-plugin</artifactId>
<version>1.0</version>
Modified: hama/trunk/c++/src/main/native/examples/README.txt
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/README.txt?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/README.txt (original)
+++ hama/trunk/c++/src/main/native/examples/README.txt Fri Dec 13 10:19:46 2013
@@ -37,7 +37,7 @@ Run summation example
View input and output data
% hadoop fs -cat /examples/input/summation/input.txt
-% hama seqdumper -seqFile /examples/output/summation/part-00000
+% hama seqdumper -file /examples/output/summation/part-00000
You should see
# Input Path: /examples/output/summation/part-00000
@@ -68,7 +68,7 @@ Run piestimator example because no input
View output data
-% hama seqdumper -seqFile /examples/output/piestimator/part-00001
+% hama seqdumper -file /examples/output/piestimator/part-00001
You should see
# Input Path: /examples/output/piestimator/part-00001
@@ -111,14 +111,14 @@ Run matrixmultiplication example
View input and output data
% hama seqdumper \
- -seqFile /examples/input/matrixmultiplication/MatrixA.seq
+ -file /examples/input/matrixmultiplication/MatrixA.seq
% hama seqdumper \
- -seqFile \
+ -file \
/examples/input/matrixmultiplication/MatrixB_transposed.seq
-% hama seqdumper -seqFile \
- /examples/output/matrixmultiplication/part-00001
+% hama seqdumper \
+ -file /examples/output/matrixmultiplication/part-00001
Delete output folder
Modified: hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/api/hama/Pipes.hh?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh (original)
+++ hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh Fri Dec 13 10:19:46 2013
@@ -238,6 +238,7 @@ namespace HamaPipes {
* Register a counter with the given group and name.
*/
//virtual Counter* getCounter(const string& group, const string& name) = 0;
+ virtual long getCounter(const string& group, const string& name) = 0;
/**
* Increment the value of the counter with the given amount.
@@ -329,6 +330,7 @@ namespace HamaPipes {
}
virtual void nextEvent() = 0;
+ virtual bool verifyResult(int32_t expected_response_cmd) = 0;
virtual UpwardProtocol<BinaryUpwardProtocol>* getUplink() = 0;
virtual ~Protocol(){}
};
Modified: hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/impl/Pipes.cc?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc (original)
+++ hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc Fri Dec 13 10:19:46 2013
@@ -28,9 +28,9 @@ public:
HADOOP_ASSERT(out_stream_->open(out_stream), "problem opening stream");
}
- /* local function */
+ /* local sendCommand function */
void sendCommand(int32_t cmd, bool flush) {
- serializeInt(cmd, *out_stream_);
+ serialize<int32_t>(cmd, *out_stream_);
if (flush) {
out_stream_->flush();
}
@@ -111,25 +111,25 @@ public:
/*
virtual void registerCounter(int id, const string& group,
const string& name) {
- serializeInt(REGISTER_COUNTER, *stream);
- serializeInt(id, *stream);
- serializeString(group, *stream);
- serializeString(name, *stream);
+ serialize<int32_t>(REGISTER_COUNTER, *stream);
+ serialize<int32_t>(id, *stream);
+ serialize<string>(group, *stream);
+ serialize<string>(name, *stream);
}
virtual void incrementCounter(const TaskContext::Counter* counter,
uint64_t amount) {
- serializeInt(INCREMENT_COUNTER, *stream);
- serializeInt(counter->getId(), *stream);
- serializeLong(amount, *stream);
+ serialize<int32_t>(INCREMENT_COUNTER, *stream);
+ serialize<int32_t>(counter->getId(), *stream);
+ serialize<int64_t>(amount, *stream);
}
*/
virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
- serializeInt(INCREMENT_COUNTER, *out_stream_);
- serializeString(group, *out_stream_);
- serializeString(name, *out_stream_);
- serializeLong(amount, *out_stream_);
+ serialize<int32_t>(INCREMENT_COUNTER, *out_stream_);
+ serialize<string>(group, *out_stream_);
+ serialize<string>(name, *out_stream_);
+ serialize<int64_t>(amount, *out_stream_);
out_stream_->flush();
if(logging) {
fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent incrementCounter\n");
@@ -173,7 +173,7 @@ public:
cmd = deserializeInt(*in_stream_);
switch (cmd) {
-
+
case START_MESSAGE: {
int32_t protocol_version;
protocol_version = deserialize<int32_t>(*in_stream_);
@@ -293,6 +293,17 @@ public:
}
/**
+ * Check for valid response command
+ */
+ bool verifyResult(int32_t expected_response_cmd) {
+ int32_t response = deserialize<int32_t>(*in_stream_);
+ if (response != expected_response_cmd) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
* Wait for next event, which should be a response for
* a previously sent command (expected_response_cmd)
* and return the generic result
@@ -303,14 +314,13 @@ public:
T result = T();
// read response command
- int32_t cmd;
- cmd = deserializeInt(*in_stream_);
+ int32_t cmd = deserialize<int32_t>(*in_stream_);
// check if response is expected
if (expected_response_cmd == cmd) {
switch (cmd) {
-
+
case GET_MSG_COUNT: {
T msg_count;
msg_count = deserialize<T>(*in_stream_);
@@ -362,7 +372,7 @@ public:
}
return superstep_count;
}
-
+
case SEQFILE_OPEN: {
T file_id = deserialize<T>(*in_stream_);
if(logging) {
@@ -421,8 +431,7 @@ public:
vector<T> results;
// read response command
- int32_t cmd;
- cmd = deserializeInt(*in_stream_);
+ int32_t cmd = deserialize<int32_t>(*in_stream_);
// check if response is expected
if (expected_response_cmd == cmd) {
@@ -467,14 +476,13 @@ public:
KeyValuePair<K,V> key_value_pair;
// read response command
- int32_t cmd;
- cmd = deserializeInt(*in_stream_);
+ int32_t cmd = deserialize<int32_t>(*in_stream_);
// check if response is expected or END_OF_DATA
if ((expected_response_cmd == cmd) || (cmd == END_OF_DATA) ) {
switch (cmd) {
-
+
case READ_KEYVALUE: {
K key = deserialize<K>(*in_stream_);
V value = deserialize<V>(*in_stream_);
@@ -727,20 +735,27 @@ public:
/**
* Register a counter with the given group and name.
*/
- /*
- virtual Counter* getCounter(const std::string& group,
- const std::string& name) {
- int id = registeredCounterIds.size();
- registeredCounterIds.push_back(id);
- uplink->registerCounter(id, group, name);
- return new Counter(id);
- }*/
+ virtual long getCounter(const string& group, const string& name) {
+ // TODO
+ // int id = registeredCounterIds.size();
+ // registeredCounterIds.push_back(id);
+ // uplink->registerCounter(id, group, name);
+ // return new Counter(id);
+ return 0;
+ }
/**
- * Increment the value of the counter with the given amount.
+ * Increments the counter identified by the group and counter name by the
+ * specified amount.
*/
virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
uplink_->incrementCounter(group, name, amount);
+
+ // Verify response command
+ bool response = protocol_->verifyResult(INCREMENT_COUNTER);
+ if (response == false) {
+ throw Error("incrementCounter received wrong response!");
+ }
}
/********************************************/
@@ -775,6 +790,12 @@ public:
*/
virtual void sendMessage(const string& peer_name, const M& msg) {
uplink_->sendCommand<string,M>(SEND_MSG, peer_name, msg);
+
+ // Verify response command
+ bool response = protocol_->verifyResult(SEND_MSG);
+ if (response == false) {
+ throw Error("sendMessage received wrong response!");
+ }
}
/**
@@ -815,6 +836,12 @@ public:
*/
virtual void sync() {
uplink_->sendCommand(SYNC);
+
+ // Verify response command
+ bool response = protocol_->verifyResult(SYNC);
+ if (response == false) {
+ throw Error("sync received wrong response!");
+ }
}
/**
@@ -911,6 +938,12 @@ public:
*/
virtual void clear() {
uplink_->sendCommand(CLEAR);
+
+ // Verify response command
+ bool response = protocol_->verifyResult(CLEAR);
+ if (response == false) {
+ throw Error("clear received wrong response!");
+ }
}
/**
@@ -918,10 +951,16 @@ public:
*/
virtual void write(const K2& key, const V2& value) {
if (writer_ != NULL) {
- writer_->emit(key, value);
+ writer_->emit(key, value); // TODO writer not implemented
} else {
uplink_->sendCommand<K2,V2>(WRITE_KEYVALUE, key, value);
}
+
+ // Verify response command
+ bool response = protocol_->verifyResult(WRITE_KEYVALUE);
+ if (response == false) {
+ throw Error("write received wrong response!");
+ }
}
/**
@@ -952,6 +991,12 @@ public:
*/
virtual void reopenInput() {
uplink_->sendCommand(REOPEN_INPUT);
+
+ // Verify response command
+ bool response = protocol_->verifyResult(REOPEN_INPUT);
+ if (response == false) {
+ throw Error("reopenInput received wrong response!");
+ }
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java Fri Dec 13 10:19:46 2013
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.pipes;
import java.io.BufferedReader;
@@ -361,12 +360,12 @@ public class PipesApplication<K1, V1, K2
*
* @throws IOException
*/
- public void cleanup() throws IOException {
+ public void cleanup(boolean sendClose) throws IOException {
if (serverSocket != null) {
serverSocket.close();
}
try {
- if (downlink != null) {
+ if ((downlink != null) && (sendClose)) {
downlink.close();
}
} catch (InterruptedException ie) {
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java Fri Dec 13 10:19:46 2013
@@ -93,7 +93,7 @@ public class PipesBSP<K1 extends Writabl
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
- this.application.cleanup();
+ this.application.cleanup(true);
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java Fri Dec 13 10:19:46 2013
@@ -31,12 +31,11 @@ import org.apache.hama.bsp.TextInputForm
/**
* Dummy input format used when non-Java a {@link RecordReader} is used by the
- * Pipes' application.
+ * Pipes application.
*
- * The only useful thing this does is set up the Map-Reduce job to get the
- * {@link PipesDummyRecordReader}, everything else left for the 'actual'
- * InputFormat specified by the user which is given by
- * <i>mapred.pipes.user.inputformat</i>.
+ * The only useful thing this does is set up the BSP job to get the
+ * PipesDummyRecordReader, everything else left for the 'actual' InputFormat
+ * specified by the user which is given by <i>hama.pipes.user.inputformat</i>.
*
* Adapted from Hadoop Pipes.
*
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java Fri Dec 13 10:19:46 2013
@@ -49,7 +49,7 @@ public class PipesPartitioner<K, V> impl
public void cleanup() {
try {
- application.cleanup();
+ application.cleanup(true);
} catch (IOException e) {
LOG.error(e);
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java Fri Dec 13 10:19:46 2013
@@ -214,7 +214,7 @@ public class Submitter implements Tool {
* Submit a job to the cluster. All of the necessary modifications to the job
* to run under pipes are made to the configuration.
*
- * @param conf the job to submit to the cluster (MODIFIED)
+ * @param job to submit to the cluster
* @throws IOException
*/
public static void runJob(BSPJob job) throws IOException {
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java Fri Dec 13 10:19:46 2013
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.pipes.protocol;
import java.io.BufferedOutputStream;
@@ -50,8 +49,7 @@ import org.apache.hama.pipes.Submitter;
public class BinaryProtocol<K1, V1, K2, V2, M extends Writable> implements
DownwardProtocol<K1, V1, K2, V2> {
- protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class
- .getName());
+ protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class);
public static final int CURRENT_PROTOCOL_VERSION = 0;
/**
* The buffer size for the command socket
@@ -74,7 +72,7 @@ public class BinaryProtocol<K1, V1, K2,
* Upward messages are passed on the specified handler and downward downward
* messages are public methods on this object.
*
- * @param jobConfig The job's configuration
+ * @param conf The job's configuration
* @param out The output stream to communicate on.
* @param in The input stream to communicate on.
* @throws IOException
@@ -362,8 +360,8 @@ public class BinaryProtocol<K1, V1, K2,
* @throws IOException
*/
protected void writeObject(Writable obj) throws IOException {
- // For basic types IntWritable, LongWritable, FloatWritable, DoubleWritable,
- // Text and BytesWritable, encode them directly, so that they end up
+ // For basic types IntWritable, LongWritable, Text and BytesWritable,
+ // encode them directly, so that they end up
// in C++ as the natural translations.
if (obj instanceof Text) {
Text t = (Text) obj;
@@ -383,20 +381,8 @@ public class BinaryProtocol<K1, V1, K2,
} else if (obj instanceof LongWritable) {
WritableUtils.writeVLong(this.outStream, ((LongWritable) obj).get());
- // else if ((obj instanceof FloatWritable) || (obj instanceof
- // DoubleWritable))
-
} else {
- // Note: other types are transfered as String which should be implemented
- // in Writable itself
-
- // DataOutputBuffer buffer = new DataOutputBuffer();
- // buffer.reset();
- // obj.write(buffer);
- // int length = buffer.getLength();
- // WritableUtils.writeVInt(stream, length);
- // stream.write(buffer.getData(), 0, length);
-
+ // Note: FloatWritable and DoubleWritable are written here
obj.write(this.outStream);
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java Fri Dec 13 10:19:46 2013
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.pipes.protocol;
import java.io.IOException;
@@ -42,6 +41,7 @@ public interface DownwardProtocol<K1, V1
/**
* Set the BSP Job Configuration
*
+ * @param conf The job's configuration
* @throws IOException
*/
void setBSPJobConf(Configuration conf) throws IOException;
@@ -58,6 +58,8 @@ public interface DownwardProtocol<K1, V1
/**
* runSetup
*
+ * @param pipedInput use pipedInput
+ * @param pipedOutput use pipedOutput
* @throws IOException
*/
void runSetup(boolean pipedInput, boolean pipedOutput) throws IOException;
@@ -65,6 +67,8 @@ public interface DownwardProtocol<K1, V1
/**
* runBsp
*
+ * @param pipedInput use pipedInput
+ * @param pipedOutput use pipedOutput
* @throws IOException
*/
void runBsp(boolean pipedInput, boolean pipedOutput) throws IOException;
@@ -72,6 +76,8 @@ public interface DownwardProtocol<K1, V1
/**
* runCleanup
*
+ * @param pipedInput use pipedInput
+ * @param pipedOutput use pipedOutput
* @throws IOException
*/
void runCleanup(boolean pipedInput, boolean pipedOutput) throws IOException;
@@ -79,6 +85,9 @@ public interface DownwardProtocol<K1, V1
/**
* getPartition
*
+ * @param key
+ * @param value
+ * @param numTasks number of available tasks
* @throws IOException
*/
int getPartition(K1 key, V1 value, int numTasks) throws IOException;
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java Fri Dec 13 10:19:46 2013
@@ -15,13 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.pipes.protocol;
/**
* The integer codes to represent the different messages. These must match the
* C++ codes or massive confusion will result.
- *
*/
public enum MessageType {
START(0), SET_BSPJOB_CONF(1), SET_INPUT_TYPES(2), RUN_SETUP(3), RUN_BSP(4),
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java Fri Dec 13 10:19:46 2013
@@ -45,8 +45,6 @@ import org.apache.hama.commons.util.KeyV
*
* @param <K1> input key.
* @param <V1> input value.
- * @param <K2> output key.
- * @param <V2> output value.
*/
public class StreamingProtocol<K1 extends Writable, V1 extends Writable>
extends BinaryProtocol<K1, V1, Text, Text, BytesWritable> {
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java Fri Dec 13 10:19:46 2013
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.pipes.protocol;
import java.io.BufferedInputStream;
@@ -100,8 +99,9 @@ public class UplinkReader<KEYIN, VALUEIN
}
int cmd = readCommand();
- if (cmd == -1)
+ if (cmd == -1) {
continue;
+ }
LOG.debug("Handling uplink command: " + MessageType.values()[cmd]);
if (cmd == MessageType.WRITE_KEYVALUE.code && isPeerAvailable()) { // INCOMING
@@ -114,8 +114,8 @@ public class UplinkReader<KEYIN, VALUEIN
} else if (cmd == MessageType.REGISTER_COUNTER.code
&& isPeerAvailable()) { // INCOMING
/*
- * Is not used in HAMA -> Hadoop Pipes - maybe for performance, skip
- * transferring group and name each INCREMENT
+ * Is not used in Hama. Hadoop Pipes uses it - maybe for performance
+ * issue, skip transferring group and name each INCREMENT
*/
} else if (cmd == MessageType.TASK_DONE.code) { // INCOMING
synchronized (binProtocol.hasTaskLock) {
@@ -149,8 +149,8 @@ public class UplinkReader<KEYIN, VALUEIN
} else if (cmd == MessageType.REOPEN_INPUT.code && isPeerAvailable()) { // INCOMING
reopenInput();
} else if (cmd == MessageType.CLEAR.code && isPeerAvailable()) { // INCOMING
- LOG.debug("Got MessageType.CLEAR");
- peer.clear();
+ clear();
+
/* SequenceFileConnector Implementation */
} else if (cmd == MessageType.SEQFILE_OPEN.code) { // OUTGOING
seqFileOpen();
@@ -161,6 +161,7 @@ public class UplinkReader<KEYIN, VALUEIN
} else if (cmd == MessageType.SEQFILE_CLOSE.code) { // OUTGOING
seqFileClose();
/* SequenceFileConnector Implementation */
+
} else if (cmd == MessageType.PARTITION_RESPONSE.code) { // INCOMING
partitionResponse();
} else {
@@ -192,7 +193,22 @@ public class UplinkReader<KEYIN, VALUEIN
public void reopenInput() throws IOException {
LOG.debug("Got MessageType.REOPEN_INPUT");
+
peer.reopenInput();
+
+ WritableUtils.writeVInt(this.outStream, MessageType.REOPEN_INPUT.code);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.REOPEN_INPUT");
+ }
+
+ public void clear() throws IOException {
+ LOG.debug("Got MessageType.CLEAR");
+
+ peer.clear();
+
+ WritableUtils.writeVInt(this.outStream, MessageType.CLEAR.code);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.CLEAR");
}
public void getSuperstepCount() throws IOException {
@@ -257,7 +273,12 @@ public class UplinkReader<KEYIN, VALUEIN
public void sync() throws IOException, SyncException, InterruptedException {
LOG.debug("Got MessageType.SYNC");
+
peer.sync(); // this call blocks
+
+ WritableUtils.writeVInt(this.outStream, MessageType.SYNC.code);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.SYNC");
}
public void getMessage() throws IOException {
@@ -282,10 +303,20 @@ public class UplinkReader<KEYIN, VALUEIN
}
public void incrementCounter() throws IOException {
+ LOG.debug("Got MessageType.INCREMENT_COUNTER");
+
String group = Text.readString(this.inStream);
String name = Text.readString(this.inStream);
long amount = WritableUtils.readVLong(this.inStream);
+
+ LOG.debug("Got MessageType.INCREMENT_COUNTER group: " + group + " name: "
+ + name + " amount: " + amount);
+
peer.incrementCounter(group, name, amount);
+
+ WritableUtils.writeVInt(this.outStream, MessageType.INCREMENT_COUNTER.code);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.INCREMENT_COUNTER");
}
@SuppressWarnings("unchecked")
@@ -303,7 +334,11 @@ public class UplinkReader<KEYIN, VALUEIN
peer.send(peerName, message);
- LOG.debug("Done MessageType.SEND_MSG to peerName: "
+ WritableUtils.writeVInt(this.outStream, MessageType.SEND_MSG.code);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.SEND_MSG");
+
+ LOG.debug("Sent message to peerName: "
+ peerName
+ " messageClass: "
+ message.getClass().getName()
@@ -362,13 +397,18 @@ public class UplinkReader<KEYIN, VALUEIN
Object.class), conf);
LOG.debug("Got MessageType.WRITE_KEYVALUE keyOutClass: "
- + keyOut.getClass().getName() + " valueOutClass: " + valueOut.getClass().getName());
+ + keyOut.getClass().getName() + " valueOutClass: "
+ + valueOut.getClass().getName());
readObject((Writable) keyOut);
readObject((Writable) valueOut);
peer.write(keyOut, valueOut);
+ WritableUtils.writeVInt(this.outStream, MessageType.WRITE_KEYVALUE.code);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.WRITE_KEYVALUE");
+
LOG.debug("Done MessageType.WRITE_KEYVALUE -"
+ " Key: "
+ ((keyOut.toString().length() < 10) ? keyOut.toString() : keyOut
@@ -385,6 +425,7 @@ public class UplinkReader<KEYIN, VALUEIN
// key and value class stored in the SequenceFile
String keyClass = Text.readString(this.inStream);
String valueClass = Text.readString(this.inStream);
+ LOG.debug("GOT MessageType.SEQFILE_OPEN - Path: " + path);
LOG.debug("GOT MessageType.SEQFILE_OPEN - Option: " + option);
LOG.debug("GOT MessageType.SEQFILE_OPEN - KeyClass: " + keyClass);
LOG.debug("GOT MessageType.SEQFILE_OPEN - ValueClass: " + valueClass);
@@ -418,8 +459,10 @@ public class UplinkReader<KEYIN, VALUEIN
sequenceKeyWritable, sequenceValueWritable)));
} catch (IOException e) {
+ LOG.error("SEQFILE_OPEN - " + e.getMessage());
fileID = -1;
} catch (ClassNotFoundException e) {
+ LOG.error("SEQFILE_OPEN - " + e.getMessage());
fileID = -1;
}
@@ -451,10 +494,14 @@ public class UplinkReader<KEYIN, VALUEIN
sequenceKeyWritable, sequenceValueWritable)));
} catch (IOException e) {
+ LOG.error("SEQFILE_OPEN - " + e.getMessage());
fileID = -1;
} catch (ClassNotFoundException e) {
+ LOG.error("SEQFILE_OPEN - " + e.getMessage());
fileID = -1;
}
+ } else { // wrong option
+ LOG.error("SEQFILE_OPEN - Wrong option: '" + option + "'");
}
WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_OPEN.code);
@@ -518,9 +565,9 @@ public class UplinkReader<KEYIN, VALUEIN
// check if fileID is available in sequenceFileWriter
if (sequenceFileWriters.containsKey(fileID)) {
- Writable sequenceKeyWritable = sequenceFileReaders.get(fileID).getValue()
+ Writable sequenceKeyWritable = sequenceFileWriters.get(fileID).getValue()
.getKey();
- Writable sequenceValueWritable = sequenceFileReaders.get(fileID)
+ Writable sequenceValueWritable = sequenceFileWriters.get(fileID)
.getValue().getValue();
// try to read key and value
@@ -555,6 +602,7 @@ public class UplinkReader<KEYIN, VALUEIN
public void seqFileClose() throws IOException {
int fileID = WritableUtils.readVInt(this.inStream);
+ LOG.debug("GOT MessageType.SEQFILE_CLOSE - FileID: " + fileID);
boolean result = false;
@@ -594,11 +642,9 @@ public class UplinkReader<KEYIN, VALUEIN
*/
protected void readObject(Writable obj) throws IOException {
byte[] buffer;
-
// For BytesWritable and Text, use the specified length to set the length
// this causes the "obvious" translations to work. So that if you emit
// a string "abc" from C++, it shows up as "abc".
-
if (obj instanceof Text) {
int numBytes = WritableUtils.readVInt(this.inStream);
buffer = new byte[numBytes];
@@ -612,29 +658,22 @@ public class UplinkReader<KEYIN, VALUEIN
((BytesWritable) obj).set(buffer, 0, numBytes);
} else if (obj instanceof IntWritable) {
- LOG.debug("read IntWritable");
((IntWritable) obj).set(WritableUtils.readVInt(this.inStream));
} else if (obj instanceof LongWritable) {
((LongWritable) obj).set(WritableUtils.readVLong(this.inStream));
- // else if ((obj instanceof FloatWritable) || (obj instanceof
- // DoubleWritable))
-
} else if (obj instanceof NullWritable) {
throw new IOException("Cannot read data into NullWritable!");
} else {
- // Note: other types are transfered as String which should be implemented
- // in Writable itself
try {
- LOG.debug("reading other type");
+ LOG.debug("reading type: " + obj.getClass().getName());
+
// try reading object
obj.readFields(this.inStream);
- // String s = Text.readString(inStream);
} catch (IOException e) {
-
throw new IOException("Hama Pipes is not able to read "
+ obj.getClass().getName(), e);
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java Fri Dec 13 10:19:46 2013
@@ -43,6 +43,7 @@ public class DistributedCacheUtil {
* symbolic links for URIs specified with a fragment if
* DistributedCache.getSymlinks() is true.
*
+ * @param conf The job's configuration
* @throws IOException If a DistributedCache file cannot be found.
*/
public static final void moveLocalFiles(Configuration conf)
@@ -81,8 +82,8 @@ public class DistributedCacheUtil {
}
if (files.length() > 0) {
// I've replaced the use of the missing setLocalFiles and
- // addLocalFiles methods (hadoop 0.23.x) with our own DistCacheUtils methods
- // which set the cache configurations directly.
+ // addLocalFiles methods (hadoop 0.23.x) with our own DistCacheUtils
+ // methods which set the cache configurations directly.
DistCacheUtils.addLocalFiles(conf, files.toString());
}
}
@@ -90,8 +91,8 @@ public class DistributedCacheUtil {
/**
* Add the Files to HDFS
*
- * @param conf
- * @param paths
+ * @param conf The job's configuration
+ * @param files Paths that should be transfered to HDFS
*/
public static String addFilesToHDFS(Configuration conf, String files) {
if (files == null)
@@ -139,8 +140,7 @@ public class DistributedCacheUtil {
/**
* Add the JARs from the given HDFS paths to the Classpath
*
- * @param conf
- * @param urls
+ * @param conf The job's configuration
*/
public static URL[] addJarsToJobClasspath(Configuration conf) {
URL[] classLoaderURLs = ((URLClassLoader) conf.getClassLoader()).getURLs();
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java Fri Dec 13 10:19:46 2013
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hama.pipes.util;
import java.io.FileWriter;
@@ -36,7 +35,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
-//import org.apache.hama.util.GenericOptionsParser;
public class SequenceFileDumper {
@@ -63,28 +61,23 @@ public class SequenceFileDumper {
Option option = OptionBuilder.withArgName(name).hasArgs(1)
.withDescription(description).isRequired(required).create();
options.addOption(option);
-
}
Parser createParser() {
- Parser result = new BasicParser();
- return result;
+ return new BasicParser();
}
void printUsage() {
// The CLI package should do this for us, but I can't figure out how
// to make it print something reasonable.
System.out.println("hama seqdumper");
+ System.out.println(" [-file <path>] // The sequence file to read");
System.out
- .println(" [-seqFile <path>] // The Sequence File containing the Clusters");
- System.out
- .println(" [-output <path>] // The output file. If not specified, dumps to the console");
+ .println(" [-output <path>] // The output file. If not specified, dumps to the console");
System.out
- .println(" [-substring <number> // The number of chars of the FormatString() to print");
+ .println(" [-substring <number> // The number of chars of value to print");
System.out.println(" [-count <true>] // Report the count only");
- System.out.println(" [-help] // Print out help");
System.out.println();
- //GenericOptionsParser.printGenericCommandUsage(System.out);
}
}
@@ -95,36 +88,22 @@ public class SequenceFileDumper {
return;
}
- LOG.info("DEBUG: Hama SequenceFileDumper started!");
-
- cli.addOption("seqFile", false,
- "The Sequence File containing the Clusters", "path");
+ // Add arguments
+ cli.addOption("file", false, "The Sequence File containing the Clusters",
+ "path");
cli.addOption("output", false,
"The output file. If not specified, dumps to the console", "path");
-
cli.addOption("substring", false,
"The number of chars of the FormatString() to print", "number");
cli.addOption("count", false, "Report the count only", "number");
- cli.addOption("help", false, "Print out help", "class");
Parser parser = cli.createParser();
-
try {
HamaConfiguration conf = new HamaConfiguration();
-
- //GenericOptionsParser genericParser = new GenericOptionsParser(conf, args);
-
CommandLine cmdLine = parser.parse(cli.options, args);
- // genericParser.getRemainingArgs());
- LOG.debug("DEBUG: Arguments: " + args); //genericParser.getRemainingArgs());
-
- if (cmdLine.hasOption("help")) {
- cli.printUsage();
- return;
- }
- if (cmdLine.hasOption("seqFile")) {
- Path path = new Path(cmdLine.getOptionValue("seqFile"));
+ if (cmdLine.hasOption("file")) {
+ Path path = new Path(cmdLine.getOptionValue("file"));
FileSystem fs = FileSystem.get(path.toUri(), conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
@@ -135,6 +114,7 @@ public class SequenceFileDumper {
} else {
writer = new OutputStreamWriter(System.out);
}
+
writer.append("Input Path: ").append(String.valueOf(path))
.append(LINE_SEP);
@@ -143,10 +123,9 @@ public class SequenceFileDumper {
sub = Integer.parseInt(cmdLine.getOptionValue("substring"));
}
- boolean countOnly = cmdLine.hasOption("count");
-
Writable key = (Writable) reader.getKeyClass().newInstance();
Writable value = (Writable) reader.getValueClass().newInstance();
+
writer.append("Key class: ")
.append(String.valueOf(reader.getKeyClass()))
.append(" Value Class: ").append(String.valueOf(value.getClass()))
@@ -154,6 +133,7 @@ public class SequenceFileDumper {
writer.flush();
long count = 0;
+ boolean countOnly = cmdLine.hasOption("count");
if (countOnly == false) {
while (reader.next(key, value)) {
writer.append("Key: ").append(String.valueOf(key));
@@ -166,7 +146,8 @@ public class SequenceFileDumper {
}
writer.append("Count: ").append(String.valueOf(count))
.append(LINE_SEP);
- } else {
+
+ } else { // count only
while (reader.next(key, value)) {
count++;
}
@@ -179,10 +160,13 @@ public class SequenceFileDumper {
writer.close();
}
reader.close();
+
+ } else {
+ cli.printUsage();
}
} catch (ParseException e) {
- LOG.info("Error : " + e);
+ LOG.error(e.getMessage());
cli.printUsage();
return;
}