You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2012/08/05 13:07:49 UTC
svn commit: r1369551 [3/3] - in /hama/branches/HAMA-505-branch: conf/
core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/ft/
core/src/main/java/org/apache/hama/bsp/message/ core/src/main/j...
Modified: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Sun Aug 5 11:07:48 2012
@@ -17,7 +17,18 @@
*/
package org.apache.hama.bsp;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import junit.framework.TestCase;
@@ -25,19 +36,30 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.TestBSPTaskFaults.MinimalGroomServer;
-import org.apache.hama.bsp.message.type.ByteMessage;
-import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.Counters.Counter;
+import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
+import org.apache.hama.bsp.ft.FaultTolerantPeerService;
+import org.apache.hama.bsp.message.MessageEventListener;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.MessageQueue;
+import org.apache.hama.bsp.sync.BSPPeerSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
+import org.apache.hama.bsp.sync.SyncEvent;
+import org.apache.hama.bsp.sync.SyncEventListener;
+import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
-import org.apache.hama.ipc.BSPPeerProtocol;
-import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.util.BSPNetUtils;
+import org.apache.hama.util.KeyValuePair;
public class TestCheckpoint extends TestCase {
@@ -45,130 +67,578 @@ public class TestCheckpoint extends Test
static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/";
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ public static class TestMessageManager implements MessageManager<Text> {
+
+ List<Text> messageQueue = new ArrayList<Text>();
+ BSPMessageBundle<Text> loopbackBundle = new BSPMessageBundle<Text>();
+ Iterator<Text> iter = null;
+ MessageEventListener<Text> listener;
+
+ @Override
+ public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, Text> peer,
+ Configuration conf, InetSocketAddress peerAddress) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Text getCurrentMessage() throws IOException {
+ if (iter == null)
+ iter = this.messageQueue.iterator();
+ if (iter.hasNext())
+ return iter.next();
+ return null;
+ }
+
+ @Override
+ public void send(String peerName, Text msg) throws IOException {
+ }
+
+ @Override
+ public void finishSendPhase() throws IOException {
+ }
+
+ @Override
+ public Iterator<Entry<InetSocketAddress, MessageQueue<Text>>> getMessageIterator() {
+ return null;
+ }
+
+ @Override
+ public void transfer(InetSocketAddress addr, BSPMessageBundle<Text> bundle)
+ throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void clearOutgoingQueues() {
+ }
+
+ @Override
+ public int getNumCurrentMessages() {
+ return this.messageQueue.size();
+ }
+
+ public BSPMessageBundle<Text> getLoopbackBundle() {
+ return this.loopbackBundle;
+ }
+
+ public void addMessage(Text message) throws IOException {
+ this.messageQueue.add(message);
+ listener.onMessageReceived(message);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) {
+ this.loopbackBundle = (BSPMessageBundle<Text>) bundle;
+ }
+
+ @Override
+ public void loopBackMessage(Writable message) {
+ }
+
+ @Override
+ public void registerListener(MessageEventListener<Text> listener)
+ throws IOException {
+ this.listener = listener;
+ }
+
+ }
+
+ public static class TestBSPPeer implements
+ BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, Text> {
+
+ Configuration conf;
+ long superstepCount;
+ FaultTolerantPeerService<Text> fService;
+
+ public TestBSPPeer(BSPJob job, Configuration conf, TaskAttemptID taskId,
+ Counters counters, long superstep, BSPPeerSyncClient syncClient,
+ MessageManager<Text> messenger, TaskStatus.State state) {
+ this.conf = conf;
+ if (superstep > 0)
+ superstepCount = superstep;
+ else
+ superstepCount = 0L;
+
+ try {
+ fService = (new AsyncRcvdMsgCheckpointImpl<Text>()).constructPeerFaultTolerance(
+ job, (BSPPeer<?, ?, ?, ?, Text>) this,
+ (BSPPeerSyncClient) syncClient, null, taskId, superstep, conf,
+ messenger);
+ this.fService.onPeerInitialized(state);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void send(String peerName, Text msg) throws IOException {
+ }
+
+ @Override
+ public Text getCurrentMessage() throws IOException {
+ return new Text("data");
+ }
+
+ @Override
+ public int getNumCurrentMessages() {
+ return 1;
+ }
+
+ @Override
+ public void sync() throws IOException, SyncException, InterruptedException {
+ ++superstepCount;
+ try {
+ this.fService.afterBarrier();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ LOG.info("After barrier " + superstepCount);
+ }
+
+ @Override
+ public long getSuperstepCount() {
+ return superstepCount;
+ }
+
+ @Override
+ public String getPeerName() {
+ return null;
+ }
+
+ @Override
+ public String getPeerName(int index) {
+ return null;
+ }
+
+ @Override
+ public int getPeerIndex() {
+ return 1;
+ }
+
+ @Override
+ public String[] getAllPeerNames() {
+ return null;
+ }
+
+ @Override
+ public int getNumPeers() {
+ return 0;
+ }
+
+ @Override
+ public void clear() {
+
+ }
+
+ @Override
+ public void write(NullWritable key, NullWritable value) throws IOException {
+
+ }
+
+ @Override
+ public boolean readNext(NullWritable key, NullWritable value)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public KeyValuePair<NullWritable, NullWritable> readNext()
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public void reopenInput() throws IOException {
+
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ return null;
+ }
+
+ @Override
+ public void incrementCounter(Enum<?> key, long amount) {
+
+ }
+
+ @Override
+ public void incrementCounter(String group, String counter, long amount) {
+
+ }
+
+ }
+
+ public static class TempSyncClient extends BSPPeerSyncClient {
+
+ Map<String, Writable> valueMap = new HashMap<String, Writable>();
+
+ @Override
+ public String constructKey(BSPJobID jobId, String... args) {
+ StringBuffer buffer = new StringBuffer(100);
+ buffer.append(jobId.toString()).append("/");
+ for (String arg : args) {
+ buffer.append(arg).append("/");
+ }
+ return buffer.toString();
+ }
+
+ @Override
+ public boolean storeInformation(String key, Writable value,
+ boolean permanent, SyncEventListener listener) {
+ ArrayWritable writables = (ArrayWritable) value;
+ long step = ((LongWritable) writables.get()[0]).get();
+ long count = ((LongWritable) writables.get()[1]).get();
+
+ LOG.info("SyncClient Storing value step = " + step + " count = " + count
+ + " for key " + key);
+ valueMap.put(key, value);
+ return true;
+ }
+
+ @Override
+ public boolean getInformation(String key,
+ Writable valueHolder) {
+ LOG.info("Getting value for key " + key);
+ if(!valueMap.containsKey(key)){
+ return false;
+ }
+ Writable value = valueMap.get(key);
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(byteStream);
+ byte[] data = null;
+ try {
+ value.write(outputStream);
+ outputStream.flush();
+ data = byteStream.toByteArray();
+ ByteArrayInputStream istream = new ByteArrayInputStream(data);
+ DataInputStream diStream = new DataInputStream(istream);
+ valueHolder.readFields(diStream);
+ return true;
+ } catch (IOException e) {
+ LOG.error("Error writing data to write buffer.", e);
+ } finally {
+ try {
+ byteStream.close();
+ outputStream.close();
+ } catch (IOException e) {
+ LOG.error("Error closing byte stream.", e);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean addKey(String key, boolean permanent,
+ SyncEventListener listener) {
+ valueMap.put(key, NullWritable.get());
+ return true;
+ }
+
+ @Override
+ public boolean hasKey(String key) {
+ return valueMap.containsKey(key);
+ }
+
+ @Override
+ public String[] getChildKeySet(String key, SyncEventListener listener) {
+ List<String> list = new ArrayList<String>();
+ Iterator<String> keyIter = valueMap.keySet().iterator();
+ while (keyIter.hasNext()) {
+ String keyVal = keyIter.next();
+ if (keyVal.startsWith(key + "/")) {
+ list.add(keyVal);
+ }
+ }
+ String[] arr = new String[list.size()];
+ list.toArray(arr);
+ return arr;
+ }
+
+ @Override
+ public boolean registerListener(String key, SyncEvent event,
+ SyncEventListener listener) {
+ return false;
+ }
+
+ @Override
+ public boolean remove(String key, SyncEventListener listener) {
+ valueMap.remove(key);
+ return false;
+ }
+
+ @Override
+ public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+ throws Exception {
+ }
+
+ @Override
+ public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId,
+ long superstep) throws SyncException {
+ LOG.info("Enter barrier called - " + superstep);
+ }
+
+ @Override
+ public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId,
+ long superstep) throws SyncException {
+ LOG.info("Exit barrier called - " + superstep);
+ }
+
+ @Override
+ public void register(BSPJobID jobId, TaskAttemptID taskId,
+ String hostAddress, long port) {
+ }
+
+ @Override
+ public String[] getAllPeerNames(TaskAttemptID taskId) {
+ return null;
+ }
+
+ @Override
+ public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+ String hostAddress, long port) {
+ }
+
+ @Override
+ public void stopServer() {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ }
+
+ private void checkSuperstepMsgCount(PeerSyncClient syncClient,
+ @SuppressWarnings("rawtypes")
+ BSPPeer bspTask, BSPJob job, long step, long count) {
+
+ ArrayWritable writableVal = new ArrayWritable(LongWritable.class);
+
+ boolean result = syncClient.getInformation(
+ syncClient.constructKey(job.getJobID(), "checkpoint",
+ "" + bspTask.getPeerIndex()), writableVal);
+
+ assertTrue(result);
+
+ LongWritable superstepNo = (LongWritable) writableVal.get()[0];
+ LongWritable msgCount = (LongWritable) writableVal.get()[1];
+
+ assertEquals(step, superstepNo.get());
+ assertEquals(count, msgCount.get());
+ }
+
+ public void testCheckpointInterval() throws Exception {
+ Configuration config = new Configuration();
+ System.setProperty("user.dir", "/tmp");
+ config.set(SyncServiceFactory.SYNC_PEER_CLASS,
+ TempSyncClient.class.getName());
+ config.set(Constants.FAULT_TOLERANCE_CLASS,
+ AsyncRcvdMsgCheckpointImpl.class.getName());
+ config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true);
+ config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+ config.setInt(Constants.CHECKPOINT_INTERVAL, 2);
+ config.set("bsp.output.dir", "/tmp/hama-test_out");
+ config.set("bsp.local.dir", "/tmp/hama-test");
+
+ FileSystem dfs = FileSystem.get(config);
+ BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+ TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
+
+ TestMessageManager messenger = new TestMessageManager();
+ PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+ .getPeerSyncClient(config);
+ @SuppressWarnings("rawtypes")
+ BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
+ (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
+
+ assertNotNull("BSPPeerImpl should not be null.", bspTask);
+
+ LOG.info("Created bsp peer and other parameters");
+ int port = BSPNetUtils.getFreePort(12502);
+ LOG.info("Got port = " + port);
+
+ boolean result = syncClient.getInformation(
+ syncClient.constructKey(job.getJobID(), "checkpoint",
+ "" + bspTask.getPeerIndex()), new ArrayWritable(LongWritable.class));
+
+ assertFalse(result);
+
+ bspTask.sync();
+ // Superstep 1
+
+ checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
+
+ Text txtMessage = new Text("data");
+ messenger.addMessage(txtMessage);
+
+ bspTask.sync();
+ // Superstep 2
+
+ checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
+
+ messenger.addMessage(txtMessage);
+
+ bspTask.sync();
+ // Superstep 3
+
+ checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L);
+
+ bspTask.sync();
+ // Superstep 4
+
+ checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L);
+
+ messenger.addMessage(txtMessage);
+ messenger.addMessage(txtMessage);
+
+ bspTask.sync();
+ // Superstep 5
+
+ checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L);
+
+ bspTask.sync();
+ // Superstep 6
+
+ checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L);
+
+ dfs.delete(new Path("checkpoint"), true);
+ }
+
+ @SuppressWarnings("rawtypes")
public void testCheckpoint() throws Exception {
Configuration config = new Configuration();
config.set(SyncServiceFactory.SYNC_PEER_CLASS,
- LocalBSPRunner.LocalSyncClient.class.getName());
+ TempSyncClient.class.getName());
+ config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true);
+ config.set(Constants.FAULT_TOLERANCE_CLASS,
+ AsyncRcvdMsgCheckpointImpl.class.getName());
+ config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+ int port = BSPNetUtils.getFreePort(12502);
+ LOG.info("Got port = " + port);
+
+ config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+ config.setInt(Constants.PEER_PORT, port);
+
config.set("bsp.output.dir", "/tmp/hama-test_out");
+ config.set("bsp.local.dir", "/tmp/hama-test");
+
FileSystem dfs = FileSystem.get(config);
+ BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+ TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
+
+ TestMessageManager messenger = new TestMessageManager();
+ PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+ .getPeerSyncClient(config);
+ BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
+ (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
- BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs);
- bspTask.setCurrentTaskStatus(new TaskStatus(new BSPJobID(),
- new TaskAttemptID(), 1.0f, TaskStatus.State.RUNNING, "running",
- "127.0.0.1", TaskStatus.Phase.STARTING, new Counters()));
assertNotNull("BSPPeerImpl should not be null.", bspTask);
- if (dfs.mkdirs(new Path("checkpoint"))) {
- if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001"))) {
- if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001/0")))
- ;
- }
- }
- assertTrue("Make sure directory is created.",
- dfs.exists(new Path(checkpointedDir)));
- byte[] tmpData = "data".getBytes();
- BSPMessageBundle bundle = new BSPMessageBundle();
- bundle.addMessage(new ByteMessage("abc".getBytes(), tmpData));
- assertNotNull("Message bundle can not be null.", bundle);
- assertNotNull("Configuration should not be null.", config);
- bspTask.checkpoint(checkpointedDir + "/attempt_201110302255_0001_000000_0",
- bundle);
- FSDataInputStream in = dfs.open(new Path(checkpointedDir
- + "/attempt_201110302255_0001_000000_0"));
- BSPMessageBundle bundleRead = new BSPMessageBundle();
- bundleRead.readFields(in);
- in.close();
- ByteMessage byteMsg = (ByteMessage) (bundleRead.getMessages()).get(0);
- String content = new String(byteMsg.getData());
- LOG.info("Saved checkpointed content is " + content);
- assertTrue("Message content should be the same.", "data".equals(content));
+
+ LOG.info("Created bsp peer and other parameters");
+
+ @SuppressWarnings("unused")
+ FaultTolerantPeerService<Text> service = null;
+
+ bspTask.sync();
+ LOG.info("Completed first sync.");
+
+ checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
+
+ Text txtMessage = new Text("data");
+ messenger.addMessage(txtMessage);
+
+ bspTask.sync();
+
+ LOG.info("Completed second sync.");
+
+ checkSuperstepMsgCount(syncClient, bspTask, job, 2L, 1L);
+
+ // Checking the messages for superstep 2 and peer id 1
+ String expectedPath = "checkpoint/job_checkpttest_0001/2/1";
+ FSDataInputStream in = dfs.open(new Path(expectedPath));
+
+ String className = in.readUTF();
+ Text message = (Text) ReflectionUtils.newInstance(Class.forName(className),
+ config);
+ message.readFields(in);
+
+ assertEquals("data", message.toString());
+
dfs.delete(new Path("checkpoint"), true);
}
- public void testCheckpointInterval() throws Exception {
+ public void testPeerRecovery() throws Exception {
+ Configuration config = new Configuration();
+ config.set(SyncServiceFactory.SYNC_PEER_CLASS,
+ TempSyncClient.class.getName());
+ config.set(Constants.FAULT_TOLERANCE_CLASS,
+ AsyncRcvdMsgCheckpointImpl.class.getName());
+ config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+ int port = BSPNetUtils.getFreePort(12502);
+ LOG.info("Got port = " + port);
- Configuration conf = new Configuration();
- conf.set("bsp.output.dir", "/tmp/hama-test_out");
- conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
- LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
-
- conf.setBoolean(Constants.CHECKPOINT_ENABLED, false);
-
- int port = BSPNetUtils.getFreePort(5000);
- InetSocketAddress inetAddress = new InetSocketAddress(port);
- MinimalGroomServer groom = new MinimalGroomServer(conf);
- Server workerServer = RPC.getServer(groom, inetAddress.getHostName(),
- inetAddress.getPort(), conf);
- workerServer.start();
-
- LOG.info("Started RPC server");
- conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
- conf.setInt("bsp.peers.num", 1);
-
- BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
- BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress,
- conf);
- LOG.info("Started the proxy connections");
-
- TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
- "job_201110102255", 1), 1), 1);
-
- try {
- BSPJob job = new BSPJob(new HamaConfiguration(conf));
- job.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
- job.setOutputFormat(TextOutputFormat.class);
- final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
- BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
- new InetSocketAddress("127.0.0.1", port), conf);
+ config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+ config.setInt(Constants.PEER_PORT, port);
- BSPTask task = new BSPTask();
- task.setConf(job);
+ config.set("bsp.output.dir", "/tmp/hama-test_out");
+ config.set("bsp.local.dir", "/tmp/hama-test");
- @SuppressWarnings("rawtypes")
- BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, conf, tid,
- proto, 0, null, null, new Counters());
+ FileSystem dfs = FileSystem.get(config);
+ BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+ TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
- bspPeer.setCurrentTaskStatus(new TaskStatus(new BSPJobID(), tid, 1.0f,
- TaskStatus.State.RUNNING, "running", "127.0.0.1",
- TaskStatus.Phase.STARTING, new Counters()));
-
- assertEquals(bspPeer.isReadyToCheckpoint(), false);
-
- conf.setBoolean(Constants.CHECKPOINT_ENABLED, true);
- conf.setInt(Constants.CHECKPOINT_INTERVAL, 3);
-
- bspPeer.sync();
-
- LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
- + bspPeer.getSuperstepCount());
- assertEquals(bspPeer.isReadyToCheckpoint(), false);
- bspPeer.sync();
- LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
- + bspPeer.getSuperstepCount());
- assertEquals(bspPeer.isReadyToCheckpoint(), false);
- bspPeer.sync();
- LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
- + bspPeer.getSuperstepCount());
- assertEquals(bspPeer.isReadyToCheckpoint(), true);
-
- job.setCheckPointInterval(5);
- bspPeer.sync();
- LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
- + bspPeer.getSuperstepCount());
- assertEquals(bspPeer.isReadyToCheckpoint(), false);
- bspPeer.sync();
- LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
- + bspPeer.getSuperstepCount());
- assertEquals(bspPeer.isReadyToCheckpoint(), false);
-
- } catch (Exception e) {
- LOG.error("Error testing BSPPeer.", e);
- } finally {
- umbilical.close();
- Thread.sleep(2000);
- workerServer.stop();
- Thread.sleep(2000);
+ TestMessageManager messenger = new TestMessageManager();
+ PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+ .getPeerSyncClient(config);
+
+ Text txtMessage = new Text("data");
+ String writeKey = "job_checkpttest_0001/checkpoint/1/";
+
+ Writable[] writableArr = new Writable[2];
+ writableArr[0] = new LongWritable(3L);
+ writableArr[1] = new LongWritable(5L);
+ ArrayWritable arrWritable = new ArrayWritable(LongWritable.class);
+ arrWritable.set(writableArr);
+ syncClient.storeInformation(writeKey, arrWritable, true, null);
+
+ String writePath = "checkpoint/job_checkpttest_0001/3/1";
+ FSDataOutputStream out = dfs.create(new Path(writePath));
+ for (int i = 0; i < 5; ++i) {
+ out.writeUTF(txtMessage.getClass().getCanonicalName());
+ txtMessage.write(out);
}
+ out.close();
+ @SuppressWarnings("unused")
+ BSPPeer<?, ?, ?, ?, Text> bspTask = new TestBSPPeer(job, config, taskId,
+ new Counters(), 3L, (BSPPeerSyncClient) syncClient, messenger,
+ TaskStatus.State.RECOVERING);
+
+ BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle();
+ assertEquals(5, bundleRead.getMessages().size());
+ String recoveredMsg = bundleRead.getMessages().get(0).toString();
+ assertEquals(recoveredMsg, "data");
+ dfs.delete(new Path("checkpoint"), true);
}
+
}
Added: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java (added)
+++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java Sun Aug 5 11:07:48 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.BestEffortDataLocalTaskAllocator;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+public class TestTaskAllocation extends TestCase {
+
+ public static final Log LOG = LogFactory.getLog(TestTaskAllocation.class);
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ public void testBestEffortDataLocality() throws Exception {
+
+ Configuration conf = new Configuration();
+
+ String[] locations = new String[] { "host6", "host4", "host3" };
+ String value = "data";
+ RawSplit split = new RawSplit();
+ split.setLocations(locations);
+ split.setBytes(value.getBytes(), 0, value.getBytes().length);
+ split.setDataLength(value.getBytes().length);
+
+ assertEquals(value.getBytes().length, (int) split.getDataLength());
+
+ Map<GroomServerStatus, Integer> taskCountInGroomMap = new HashMap<GroomServerStatus, Integer>(
+ 20);
+ BSPResource[] resources = new BSPResource[0];
+ BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+ JobInProgress jobProgress = new JobInProgress(job.getJobID(), conf);
+ TaskInProgress taskInProgress = new TaskInProgress(job.getJobID(),
+ "job.xml", split, conf, jobProgress, 1);
+
+ Map<String, GroomServerStatus> groomStatuses = new HashMap<String, GroomServerStatus>(
+ 20);
+
+ for (int i = 0; i < 10; ++i) {
+
+ String name = "host" + i;
+ GroomServerStatus status = new GroomServerStatus(name,
+ new ArrayList<TaskStatus>(), 0, 3);
+ groomStatuses.put(name, status);
+ taskCountInGroomMap.put(status, 0);
+
+ }
+
+ TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf
+ .getClass("", BestEffortDataLocalTaskAllocator.class,
+ TaskAllocationStrategy.class), conf);
+
+ String[] hosts = strategy.selectGrooms(groomStatuses, taskCountInGroomMap,
+ resources, taskInProgress);
+
+ List<String> list = new ArrayList<String>();
+
+ for (int i = 0; i < hosts.length; ++i) {
+ list.add(hosts[i]);
+ }
+
+ assertTrue(list.contains("host6"));
+ assertTrue(list.contains("host3"));
+ assertTrue(list.contains("host4"));
+
+ }
+
+}
Modified: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (original)
+++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Sun Aug 5 11:07:48 2012
@@ -23,7 +23,9 @@ import java.io.IOException;
import junit.framework.TestCase;
+import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
@@ -41,6 +43,7 @@ public class TestZooKeeper extends TestC
public TestZooKeeper() {
configuration = new HamaConfiguration();
+ System.setProperty("user.dir", "/tmp");
configuration.set("bsp.master.address", "localhost");
assertEquals("Make sure master addr is set to localhost:", "localhost",
configuration.get("bsp.master.address"));
@@ -144,12 +147,32 @@ public class TestZooKeeper extends TestC
Log.info("Passed the key presence test");
- Writable value = masterClient
- .getInformation(masterClient.constructKey(jobID, "info", "level2"),
- IntWritable.class);
-
- assertEquals(null, value);
- Log.info("Passed the null value check.");
+ boolean result = masterClient
+ .getInformation(masterClient.constructKey(jobID, "info", "level3"),
+ new IntWritable());
+
+ assertEquals(false, result);
+
+ Writable[] writableArr = new Writable[2];
+ writableArr[0] = new LongWritable(3L);
+ writableArr[1] = new LongWritable(5L);
+ ArrayWritable arrWritable = new ArrayWritable(LongWritable.class);
+ arrWritable.set(writableArr);
+ masterClient.storeInformation(
+ masterClient.constructKey(jobID, "info", "level3"),
+ arrWritable, true, null);
+
+ ArrayWritable valueHolder = new ArrayWritable(LongWritable.class);
+
+ boolean getResult = masterClient.getInformation(
+ masterClient.constructKey(jobID, "info", "level3"), valueHolder);
+
+ assertTrue(getResult);
+
+ assertEquals(arrWritable.get()[0], valueHolder.get()[0]);
+ assertEquals(arrWritable.get()[1], valueHolder.get()[1]);
+
+ Log.info("Passed array writable test");
done = true;
} catch (Exception e) {
Modified: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java (original)
+++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java Sun Aug 5 11:07:48 2012
@@ -17,7 +17,6 @@
*/
package org.apache.hama.bsp.sync;
-import java.io.File;
import java.util.concurrent.Executors;
import junit.framework.TestCase;
@@ -36,38 +35,36 @@ import org.apache.hama.util.BSPNetUtils;
public class TestSyncServiceFactory extends TestCase {
public static final Log LOG = LogFactory.getLog(TestCase.class);
-
- public static class ListenerTest extends ZKSyncEventListener{
+
+ public static class ListenerTest extends ZKSyncEventListener {
private Text value;
-
- public ListenerTest(){
+
+ public ListenerTest() {
value = new Text("init");
}
-
- public String getValue(){
+
+ public String getValue() {
return value.toString();
}
-
+
@Override
public void onDelete() {
- // TODO Auto-generated method stub
-
+
}
@Override
public void onChange() {
LOG.info("ZK value changed event triggered.");
value.set("Changed");
-
+
}
@Override
public void onChildKeySetChange() {
- // TODO Auto-generated method stub
-
+
}
-
+
}
public void testClientInstantiation() throws Exception {
@@ -96,7 +93,6 @@ public class TestSyncServiceFactory exte
@Override
public void run() {
- // TODO Auto-generated method stub
try {
server.start();
} catch (Exception e) {
@@ -109,10 +105,13 @@ public class TestSyncServiceFactory exte
public void testZKSyncStore() throws Exception {
Configuration conf = new Configuration();
int zkPort = BSPNetUtils.getFreePort(21811);
+ conf.set("bsp.local.dir", "/tmp/hama-test");
+ conf.set("bsp.output.dir", "/tmp/hama-test_out");
conf.setInt(Constants.PEER_PORT, zkPort);
conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, zkPort);
conf.set(Constants.ZOOKEEPER_SESSION_TIMEOUT, "12000");
+ System.setProperty("user.dir", "/tmp");
// given null, should return zookeeper
final SyncServer syncServer = SyncServiceFactory.getSyncServer(conf);
syncServer.init(conf);
@@ -123,8 +122,8 @@ public class TestSyncServiceFactory exte
Thread.sleep(1000);
- final PeerSyncClient syncClient = (PeerSyncClient)
- SyncServiceFactory.getPeerSyncClient(conf);
+ final PeerSyncClient syncClient = (PeerSyncClient) SyncServiceFactory
+ .getPeerSyncClient(conf);
assertTrue(syncClient instanceof ZooKeeperSyncClientImpl);
BSPJobID jobId = new BSPJobID("abc", 1);
TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId, 1), 1);
@@ -142,69 +141,45 @@ public class TestSyncServiceFactory exte
}
});
- try {
- IntWritable data = new IntWritable(5);
- syncClient.storeInformation(
- syncClient.constructKey(jobId, String.valueOf(1L), "test"), data,
- true, null);
-
- ListenerTest listenerTest = new ListenerTest();
-
-
- syncClient.registerListener(
- syncClient.constructKey(jobId, String.valueOf(1L), "test"),
- ZKSyncEventFactory.getValueChangeEvent(),
- listenerTest);
-
- IntWritable value = (IntWritable) syncClient.getInformation(
- syncClient.constructKey(jobId, String.valueOf(1L), "test"),
- IntWritable.class);
- assertTrue(value != null);
- int intVal = value == null ? 0 : value.get();
- assertTrue(intVal == data.get());
-
- data.set(6);
- syncClient.storeInformation(
- syncClient.constructKey(jobId, String.valueOf(1L), "test"), data,
- true, null);
- value = (IntWritable) syncClient.getInformation(
- syncClient.constructKey(jobId, String.valueOf(1L), "test"),
- IntWritable.class);
-
-
- intVal = value == null ? 0 : value.get();
- assertTrue(intVal == data.get());
-
- Thread.sleep(5000);
-
- assertEquals(true, listenerTest.getValue().equals("Changed"));
-
-
- syncServer.stopServer();
- } finally {
-
- String dir = System.getProperty("user.dir");
- LOG.info("Deleting zookeeper files in " + dir);
- File zookeeperDir = new File(dir + File.separator + "nullzookeeper");
- if (zookeeperDir.exists()) {
- File[] files = zookeeperDir.listFiles();
- for (File file : files) {
- if (file.isDirectory()) {
- File[] childFiles = file.listFiles();
- for (File childFile : childFiles) {
- LOG.info("Deleting zookeeper file - "
- + childFile.getAbsolutePath());
- childFile.delete();
- }
- } else {
- LOG.info("Deleting zookeeper file - " + file.getAbsolutePath());
- file.delete();
- }
- }
- zookeeperDir.delete();
+ IntWritable data = new IntWritable(5);
+ syncClient.storeInformation(
+ syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true,
+ null);
+
+ ListenerTest listenerTest = new ListenerTest();
+
+ syncClient.registerListener(
+ syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+ ZKSyncEventFactory.getValueChangeEvent(), listenerTest);
+
+ IntWritable valueHolder = new IntWritable();
+ boolean result = syncClient
+ .getInformation(
+ syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+ valueHolder);
+ assertTrue(result);
+ int intVal = valueHolder.get();
+ assertTrue(intVal == data.get());
+
+ data.set(6);
+ syncClient.storeInformation(
+ syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true,
+ null);
+ valueHolder = new IntWritable();
+ result = syncClient
+ .getInformation(
+ syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+ valueHolder);
+
+ assertTrue(result);
+ intVal = valueHolder.get();
+ assertTrue(intVal == data.get());
- }
- }
+ Thread.sleep(5000);
+
+ assertEquals(true, listenerTest.getValue().equals("Changed"));
+
+ syncServer.stopServer();
}