You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:22:48 UTC
[24/51] [partial] incubator-geode git commit: GEODE-77 removing the
old jgroups subproject
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/GemFireTimeSync.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/GemFireTimeSync.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/GemFireTimeSync.java
deleted file mode 100644
index e2f033c..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/GemFireTimeSync.java
+++ /dev/null
@@ -1,720 +0,0 @@
-/**
- * An implementation of the Berkeley Algorithm for clock synchronization.
- * On view changes and otherwise at a configurable interval this protocol
- * sends a request for the current millisecond clock to all members. It
- * computes the average round-trip time and the average clock value, throwing
- * out samples outside the standard deviation. The result is then used to
- * compute and send clock offsets to each member.
- *
- * @author Bruce Schuchardt
- */
-package com.gemstone.org.jgroups.protocols;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Header;
-import com.gemstone.org.jgroups.JChannel;
-import com.gemstone.org.jgroups.JGroupsVersion;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.View;
-import com.gemstone.org.jgroups.protocols.pbcast.GMS;
-import com.gemstone.org.jgroups.protocols.pbcast.GMS.GmsHeader;
-import com.gemstone.org.jgroups.spi.GFPeerAdapter;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Streamable;
-
-public class GemFireTimeSync extends Protocol {
-
- static private boolean DEBUG = Boolean.getBoolean("gemfire.time-service.debug");
-
- private int clockSyncInterval = 100; // every 100 seconds (clock drift of 1 ms per second)
- private int replyWaitInterval = 15; // fifteen second wait before timing out
- private long currCoordOffset = 0;
-
- private Address localAddress = null;
- private volatile View view;
- private final AtomicLong nextProcId = new AtomicLong(0);
- private final ConcurrentMap<Long, ReplyProcessor> processors = new ConcurrentHashMap<Long, ReplyProcessor>();
- private final ConcurrentMap<Address, GFTimeSyncHeader> joinTimeOffsets = new ConcurrentHashMap<Address, GFTimeSyncHeader>();
-
- private ServiceThread syncThread;
- // bug #50833 - use a different object to sync on creation of the syncThread
- private final Object syncThreadLock = new Object();
-
- // Test hook for unit testing.
- private TestHook testHook;
-
- private long joinReqTime = 0;
- public static final int TIME_RESPONSES = 0;
- public static final int OFFSET_RESPONSE = 1;
-
- @Override
- public String getName() {
- return "GemFireTimeSync";
- }
-
- /**
- * This method holds most of the logic for this protocol. The rest of the
- * class is composed of utility calculation methods and messaging infrastructure.
- * Here we request the current clock from each member, wait for the replies,
- * calculate the distributed time using the Berkeley Algorithm and then send
- * individual time offsets to each member including this JVM.
- */
- synchronized void computeAndSendOffsets(View v) {
-
- if (v.getMembers().size() < 2) {
- return;
- }
- // send a message containing a reply processor ID and the current time.
- // Others will respond with their own time
- long currentTime = System.currentTimeMillis();
- long procID = nextProcId.incrementAndGet();
- GFTimeSyncHeader timeHeader = new GFTimeSyncHeader(procID, GFTimeSyncHeader.OP_TIME_REQUEST, currentTime);
- ReplyProcessor proc = new ReplyProcessor(view, procID);
- processors.put(procID, proc);
-
- // specify use of UNICAST by setting the message destination. By doing
- // this we can take advantage of the isHighPriority flag for OOB messaging
- // and avoid cache traffic that might be clogging up the regular send/receive windows
- try {
- for (Iterator<?> it = v.getMembers().iterator(); it.hasNext();) {
- Address mbr = (Address)it.next();
- if (!mbr.equals(this.localAddress)) {
- // JGroups requires a different message for each destination but the
- // header can be reused
- Message timeMessage = new Message();
- timeMessage.setDest(mbr);
- timeMessage.isHighPriority = true;
- timeMessage.putHeader(getName(), timeHeader);
- passDown(new Event(Event.MSG, timeMessage));
- }
- }
- GFTimeSyncHeader myResponse = new GFTimeSyncHeader(0, (byte)0, currentTime);
- proc.replyReceived(this.localAddress, myResponse);
- proc.waitForReplies(replyWaitInterval * 1000);
- } catch (InterruptedException e) {
- return;
- } finally {
- if (testHook != null) {
- testHook.setResponses(proc.responses, currentTime);
- testHook.hook(TIME_RESPONSES);
- }
- processors.remove(procID);
- }
-
- Map<Address, GFTimeSyncHeader> responses = proc.responses;
- int numResponses = responses.size();
- if (DEBUG || log.getLogWriter().fineEnabled()) {
- log.getLogWriter().info(ExternalStrings.DEBUG, "Received " + numResponses + " responses");
- }
-
- if (numResponses > 1) {
-
- // now compute the average round-trip time and the average clock time,
- // throwing out values outside of the standard deviation for each
-
- long averageRTT = getMeanRTT(responses, 0, Long.MAX_VALUE);
- long rTTStddev = getRTTStdDev(responses, averageRTT);
- // now recompute the average throwing out ones that are way off
- long newAverageRTT = getMeanRTT(responses, averageRTT, rTTStddev);
- if (newAverageRTT > 0) {
- averageRTT = newAverageRTT;
- }
-
- long averageTime = getMeanClock(responses, 0, Long.MAX_VALUE);
- long stddev = getClockStdDev(responses, averageTime);
- long newAverageTime = getMeanClock(responses, averageTime, stddev);
- if (newAverageTime > 0) {
- averageTime = newAverageTime;
- }
-
- long averageTransmitTime = averageRTT / 2;
- long adjustedAverageTime = averageTime + averageTransmitTime;
-
- if (DEBUG || log.getLogWriter().fineEnabled()) {
- StringBuilder buffer = new StringBuilder(5000);
- for (Map.Entry<Address,GFTimeSyncHeader> entry: responses.entrySet()) {
- buffer.append("\n\t").append(entry.getKey()).append(": ").append(entry.getValue());
- }
- log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service computed "
- + "round trip time of " + averageRTT + " with stddev of " + rTTStddev + " and "
- + "clock time of " + averageTime + " with stddev of " + stddev
- + " for " + numResponses + " members. Details: \n\tstart time="
- + currentTime + " group time=" + adjustedAverageTime + " transmit time=" + averageTransmitTime
- + buffer.toString());
- }
-
- // TODO: should all members on the same machine get the same time offset?
-
- for (Iterator<Map.Entry<Address, GFTimeSyncHeader>> it = responses.entrySet().iterator(); it.hasNext(); ) {
- Map.Entry<Address, GFTimeSyncHeader> entry = it.next();
- IpAddress mbr = (IpAddress)entry.getKey();
- GFTimeSyncHeader response = entry.getValue();
- Message offsetMessage = new Message();
- offsetMessage.setDest(mbr);
- offsetMessage.isHighPriority = true;
-
- long responseTransmitTime = (response.timeReceived - currentTime) / 2;
- long offset = adjustedAverageTime - (response.time + responseTransmitTime);
-
- if (DEBUG || log.getLogWriter().fineEnabled()) {
- log.getLogWriter().info(ExternalStrings.DEBUG, "sending time offset of " + offset + " to " + entry.getKey()
- + " whose time was " + response.time + " and transmit time was " + responseTransmitTime);
- }
-
- offsetMessage.putHeader(getName(), new GFTimeSyncHeader(0, GFTimeSyncHeader.OP_TIME_OFFSET, offset));
- if (mbr == this.localAddress) {
- // We need to cache offset here too just for co-ordinator.
- currCoordOffset = offset;
- offsetMessage.setSrc(this.localAddress);
- up(new Event(Event.MSG, offsetMessage));
- } else {
- passDown(new Event(Event.MSG, offsetMessage));
- }
- }
- }
- }
-
-
- @Override
- public void up(Event event) {
- switch (event.getType()) {
- case Event.SET_LOCAL_ADDRESS:
- this.localAddress = (Address)event.getArg();
- if (DEBUG || log.getLogWriter().fineEnabled()) {
- log.getLogWriter().info(ExternalStrings.DEBUG, "GF time service setting local address to " + this.localAddress);
- }
- break;
- case Event.MSG:
- Message msg = (Message)event.getArg();
- GFTimeSyncHeader header = (GFTimeSyncHeader)msg.removeHeader(getName());
- if (header != null) {
- switch (header.opType){
- case GFTimeSyncHeader.JOIN_TIME_REQUEST:
- long beforeJoinTime = System.currentTimeMillis() + currCoordOffset;
- if (DEBUG || log.getLogWriter().fineEnabled()) {
- log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service received join time offset request from " + msg.getSrc() + " join time=" + beforeJoinTime);
- }
-
- // Store in a map and wait for JOIN reply GMS message.
- GFTimeSyncHeader respHeader = new GFTimeSyncHeader(0, GFTimeSyncHeader.JOIN_RESPONSE_OFFSET, currCoordOffset, beforeJoinTime, 0);
- joinTimeOffsets.put(msg.getSrc(), respHeader);
- break;
- case GFTimeSyncHeader.JOIN_RESPONSE_OFFSET:
- if (header.coordTimeAfterJoin == 0) { // member sending offset is using old version - ignore it
- break;
- }
- long currentLocalTime = System.currentTimeMillis();
-
- if (DEBUG || log.getLogWriter().fineEnabled()) {
- log.getLogWriter().info(
- ExternalStrings.DEBUG,
- " currentLocalTime =" + currentLocalTime
- + " coordAfterJoinTime = " + header.coordTimeAfterJoin
- + " coordBeforeJoinTime = " + header.coordTimeBeforeJoin
- + " joinReqTime = " + joinReqTime);
- }
- long transmissionTime = ((currentLocalTime - (header.coordTimeAfterJoin - header.coordTimeBeforeJoin)) - joinReqTime)/2;
- long timeOffs = header.coordTimeBeforeJoin - (joinReqTime + transmissionTime);
-
- if (DEBUG || log.getLogWriter().fineEnabled()) {
- log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service received join time offset from " + msg.getSrc() + " offset=" + timeOffs);
- }
- GFPeerAdapter mgr = stack.gfPeerFunctions;
- if (mgr != null) {
- // give the time to the manager who can install it in gemfire
- mgr.setCacheTimeOffset(msg.getSrc(), timeOffs, true);
- }
- break;
- case GFTimeSyncHeader.OP_TIME_REQUEST:
- if (DEBUG || log.getLogWriter().fineEnabled()) {
- log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service received time request from " + msg.getSrc());
- }
- GFTimeSyncHeader responseHeader = new GFTimeSyncHeader(header.procID, GFTimeSyncHeader.OP_TIME_RESPONSE, System.currentTimeMillis());
- Message response = new Message();
- response.setDest(msg.getSrc());
- response.putHeader(getName(), responseHeader);
- response.isHighPriority = true;
- passDown(new Event(Event.MSG, response));
- return;
- case GFTimeSyncHeader.OP_TIME_RESPONSE:
- if (DEBUG || log.getLogWriter().fineEnabled()) {
- log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service received time response from " + msg.getSrc());
- }
- ReplyProcessor p = processors.get(new Long(header.procID));
- if (p != null) {
- p.replyReceived(msg.getSrc(), header);
- }
- return;
- case GFTimeSyncHeader.OP_TIME_OFFSET:
- long timeOffset = header.time;
- if (DEBUG || log.getLogWriter().fineEnabled()) {
- log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service received time offset update from " + msg.getSrc() + " offset=" + timeOffset);
- }
- GFPeerAdapter jmm = stack.gfPeerFunctions;
- if (jmm != null) {
- // give the time offset to the Distribution manager who can set it in GemfireCacheImpl.
- jmm.setCacheTimeOffset(msg.getSrc(), timeOffset, false);
- }
- if (testHook != null) {
- testHook.hook(OFFSET_RESPONSE);
- }
- return;
- }
- }
- }
- passUp(event);
- }
-
- /*private long getCurrTimeOffset() {
- JGroupMembershipManager mgr = stack.jgmm;
- if (mgr != null) {
- // give the time offset to the Distribution manager who can set it in GemfireCacheImpl.
- DistributedMembershipListener listener = mgr.getListener();
- if (listener != null) {
- DistributionManager dm = listener.getDM();
- return dm .getCacheTimeOffset();
- }
- }
- return 0;
- }*/
-
- @Override
- public void down(Event event) {
- switch (event.getType()) {
- case Event.VIEW_CHANGE:
- View view = (View)event.getArg();
- if (DEBUG || log.getLogWriter().fineEnabled()) {
- log.getLogWriter().info(ExternalStrings.DEBUG, "GF time service is processing view " + view);
- }
- viewChanged(view);
- break;
- case Event.MSG:
- Message msg = (Message)event.getArg();
- GMS.GmsHeader header = (GmsHeader) msg.getHeader(GMS.name);
-
- if (header != null) {
- if (header.getType() == GmsHeader.JOIN_REQ) {
- // Send Time Sync OFFSET request header in JOIN_REQ message.
- joinReqTime = System.currentTimeMillis();
- GFTimeSyncHeader timeHeader = new GFTimeSyncHeader(0, GFTimeSyncHeader.JOIN_TIME_REQUEST, joinReqTime);
- msg.putHeader(getName(), timeHeader);
- } else if (header.getType() == GmsHeader.JOIN_RSP) {
- // Send the time offset in JOIN_RSP response message.
- GFTimeSyncHeader joinTimeSyncHeader = joinTimeOffsets.remove(msg.getDest());
- if (joinTimeSyncHeader != null) {
- long afterJoinTime = System.currentTimeMillis() + currCoordOffset;
- joinTimeSyncHeader.coordTimeAfterJoin = afterJoinTime;
- msg.putHeader(getName(), joinTimeSyncHeader);
-
- if (DEBUG || log.getLogWriter().fineEnabled()) {
- log.getLogWriter().info(ExternalStrings.DEBUG, "GemFire time service is including after-join time in join-response to " + msg.getDest() + " after join time=" + afterJoinTime);
- }
- }
- }
- }
- break;
- }
- passDown(event);
- }
-
- private void viewChanged(View newView) {
- this.view = (View) newView.clone();
- if (this.localAddress.equals(newView.getCoordinator())) {
- boolean newThread = false;
- synchronized(this.syncThreadLock) {
- if (this.syncThread == null) {
- this.syncThread = new ServiceThread(GemFireTracer.GROUP, "GemFire Time Service");
- this.syncThread.setDaemon(true);
- this.syncThread.start();
- newThread = true;
- }
- if (!newThread) {
- this.syncThread.computeOffsetsForNewView();
- }
- }
- } else {
- synchronized (this.syncThreadLock) {
- if (this.syncThread != null) {
- this.syncThread.cancel();
- }
- }
- }
- }
-
- /**
- * retrieves the average of the samples. This can be used with (samples, 0, Long.MAX_VALUE) to get
- * the initial mean and then (samples, lastResult, stddev) to get those within the standard deviation.
- * @param values
- * @param previousMean
- * @param stddev
- * @return the mean
- */
- private long getMeanRTT(Map<Address, GFTimeSyncHeader> values, long previousMean, long stddev) {
- long totalTime = 0;
- long numSamples = 0;
- long upperLimit = previousMean + stddev;
- for (GFTimeSyncHeader response: values.values()) {
- long rtt = response.timeReceived - response.time;
- if (rtt <= upperLimit) {
- numSamples++;
- totalTime += rtt;
- }
- }
- long averageTime = totalTime / numSamples;
- return averageTime;
- }
-
- private long getRTTStdDev(Map<Address, GFTimeSyncHeader> values, long average) {
- long sqDiffs = 0;
- for (GFTimeSyncHeader response: values.values()) {
- long diff = average - (response.timeReceived - response.time);
- sqDiffs += diff * diff;
- }
- return Math.round(Math.sqrt((double)sqDiffs));
- }
-
- /**
- * retrieves the average of the samples. This can be used with (samples, 0, Long.MAX_VALUE) to get
- * the initial mean and then (samples, lastResult, stddev) to get those within the standard deviation.
- * @param values
- * @param previousMean
- * @param stddev
- * @return the mean
- */
- private long getMeanClock(Map<Address, GFTimeSyncHeader> values, long previousMean, long stddev) {
- long totalTime = 0;
- long numSamples = 0;
- long upperLimit = previousMean + stddev;
- long lowerLimit = previousMean - stddev;
- for (GFTimeSyncHeader response: values.values()) {
- if (lowerLimit <= response.time && response.time <= upperLimit) {
- numSamples++;
- totalTime += response.time;
- }
- }
- long averageTime = totalTime / numSamples;
- return averageTime;
- }
-
- private long getClockStdDev(Map<Address, GFTimeSyncHeader> values, long average) {
- long sqDiffs = 0;
- for (GFTimeSyncHeader response: values.values()) {
- long diff = average - response.time;
- sqDiffs += diff * diff;
- }
- return Math.round(Math.sqrt((double)sqDiffs));
- }
-
- @Override
- public boolean setProperties(Properties props) {
- String str;
-
- super.setProperties(props);
-
- str=props.getProperty("clock_sync_interval");
- if (str != null) {
- clockSyncInterval = Integer.parseInt(str);
- props.remove("clock_sync_interval");
- }
-
- str=props.getProperty("reply_wait_interval");
- if (str != null) {
- replyWaitInterval = Integer.parseInt(str);
- props.remove("reply_wait_interval");
- }
-
- if (props.size() > 0) {
- // this will not normally be seen by customers, even if there are unrecognized properties, because
- // jgroups error messages aren't displayed unless the debug flag is turned on
- log.error(ExternalStrings.DEBUG, "The following GemFireTimeSync properties were not recognized: " + props);
- return false;
- }
- return true;
- }
-
- @Override
- public void init() throws Exception {
- super.init();
- }
-
- @Override
- public void start() throws Exception {
- super.start();
- }
-
- @Override
- public void stop() {
- super.stop();
- if (this.syncThread != null) {
- this.syncThread.cancel();
- }
- }
-
- static class ReplyProcessor {
- int responderCount;
- long procID;
- Map<Address, GFTimeSyncHeader> responses = new HashMap<Address, GFTimeSyncHeader>();
- Object doneSync = new Object();
-
- ReplyProcessor(View view, long procID) {
- responderCount = view.getMembers().size();
- this.procID = procID;
- }
-
- void replyReceived(Address sender, GFTimeSyncHeader response) {
- response.timeReceived = System.currentTimeMillis();
- synchronized(responses) {
- responses.put(sender, response);
- }
- synchronized(doneSync) {
- if (responses.size() >= responderCount) {
- doneSync.notify();
- }
- }
- }
-
- boolean done() {
- synchronized(responses) {
- return responses.size() >= responderCount;
- }
- }
-
- void waitForReplies(long timeout) throws InterruptedException {
- synchronized(doneSync) {
- long endTime = System.currentTimeMillis() + timeout;
- while (!done()) {
- // compute remaining time in case of spurious wake-up
- long remainingTime = endTime - System.currentTimeMillis();
- if (remainingTime <= 0) {
- return;
- }
- doneSync.wait(remainingTime);
- }
- }
- }
- }
-
- public static class GFTimeSyncHeader extends Header implements Streamable {
- static final byte OP_TIME_REQUEST = 0;
- static final byte OP_TIME_RESPONSE = 1;
- static final byte OP_TIME_OFFSET = 2;
- static final byte JOIN_TIME_REQUEST = 3;
- static final byte JOIN_RESPONSE_OFFSET = 4;
-
- public long procID;
- public byte opType;
- public long time;
- public long coordTimeBeforeJoin;
- public long coordTimeAfterJoin;
- public transient long timeReceived; // set by ReplyProcessor when a response is received
-
- public GFTimeSyncHeader() {}
-
- public GFTimeSyncHeader(long procID, byte opType, long time) {
- super();
- this.procID = procID;
- this.opType = opType;
- this.time = time;
- }
-
- GFTimeSyncHeader(long procID, byte opType, long time, long beforeTime, long afterTime) {
- super();
- this.procID = procID;
- this.opType = opType;
- this.time = time;
- this.coordTimeBeforeJoin = beforeTime;
- this.coordTimeAfterJoin = afterTime;
- }
-
- @Override
- public long size(short version) {
- return super.size(version) + 17;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(this.procID);
- out.writeLong(this.time);
- out.write(this.opType);
- if (JChannel.getGfFunctions().isVersionForStreamAtLeast(out, JGroupsVersion.GFE_80_ORDINAL)) {
- out.writeLong(this.coordTimeBeforeJoin);
- out.writeLong(this.coordTimeAfterJoin);
- }
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- this.procID = in.readLong();
- this.time = in.readLong();
- this.opType = in.readByte();
- if (JChannel.getGfFunctions().isVersionForStreamAtLeast(in, JGroupsVersion.GFE_80_ORDINAL)) {
- this.coordTimeBeforeJoin = in.readLong();
- this.coordTimeAfterJoin = in.readLong();
- }
- }
-
- @Override
- public void writeTo(DataOutputStream out) throws IOException {
- out.writeLong(this.procID);
- out.writeLong(this.time);
- out.write(this.opType);
- if (JChannel.getGfFunctions().isVersionForStreamAtLeast(out, JGroupsVersion.GFE_80_ORDINAL)) {
- out.writeLong(this.coordTimeBeforeJoin);
- out.writeLong(this.coordTimeAfterJoin);
- }
- }
-
- @Override
- public void readFrom(DataInputStream in) throws IOException,
- IllegalAccessException, InstantiationException {
- this.procID = in.readLong();
- this.time = in.readLong();
- this.opType = in.readByte();
- if (JChannel.getGfFunctions().isVersionForStreamAtLeast(in, JGroupsVersion.GFE_80_ORDINAL)) {
- this.coordTimeBeforeJoin = in.readLong();
- this.coordTimeAfterJoin = in.readLong();
- }
- }
-
- @Override
- public String toString() {
- return "SyncMessage(procID=" + this.procID + "; op="
- + op2String(this.opType) + "; time=" + this.time + "; timeRcvd="
- + this.timeReceived + "; coordTimeBeforeJoin="
- + this.coordTimeBeforeJoin + "; coordTimeAfterJoin="
- + this.coordTimeAfterJoin + ")";
- }
-
- private String op2String(byte op) {
- switch (op) {
- case OP_TIME_REQUEST: return "REQUEST";
- case OP_TIME_RESPONSE: return "RESPONSE";
- case OP_TIME_OFFSET: return "OFFSET";
- case JOIN_TIME_REQUEST: return "JOIN_TIME_REQUEST";
- case JOIN_RESPONSE_OFFSET: return "JOIN_OFFSET";
- }
- return "??";
- }
- }
-
- private class ServiceThread extends Thread {
- private boolean cancelled;
- private boolean waiting; // true if waiting for next scheduled time
- private boolean skipWait; // true if the thread should begin processing as soon as it finishes current view
- private Object lock = new Object();
-
- ServiceThread(ThreadGroup g, String name) {
- super(g,name);
- }
-
- @Override
- public void run() {
- synchronized(this.lock) {
- this.cancelled = false;
- }
- while (!cancelled()) {
- View v = view;
- if (v != null && v.getCreator().equals(localAddress)) {
- computeAndSendOffsets(v);
- }
- try {
- synchronized(this.lock) {
- if (this.skipWait) {
- this.skipWait = false;
- } else {
- this.waiting = true;
- try {
- this.lock.wait(GemFireTimeSync.this.clockSyncInterval*1000);
- } finally {
- this.waiting = false;
- }
- }
- }
- } catch (InterruptedException e) {
- // ignore unless cancelled
- }
- }
- }
-
- public boolean cancelled() {
- synchronized(this.lock) {
- return this.cancelled;
- }
- }
-
- public void cancel() {
- synchronized(this.lock) {
- this.cancelled = true;
- this.lock.notifyAll();
- }
- }
-
- public void computeOffsetsForNewView() {
- synchronized(this.lock) {
- if (this.waiting) {
- this.lock.notifyAll();
- } else {
- this.skipWait = true;
- }
- }
- }
- }
-
-
- /**
- * Use only for unit testing.
- */
- public void invokeServiceThreadForTest() {
- if (this.syncThread != null) {
- this.syncThread.computeOffsetsForNewView();
- }
- }
-
- /**
- * Use only for unit testing.
- */
- public boolean isServiceThreadCancelledForTest() {
- if (this.syncThread != null) {
- return this.syncThread.cancelled();
- } else {
- return true;
- }
- }
-
- public TestHook getTestHook() {
- return testHook;
- }
-
- public void setTestHook(TestHook testHook) {
- this.testHook = testHook;
- }
-
- public interface TestHook {
-
- public void hook(int barrier);
-
- public void setResponses(Map<Address, GFTimeSyncHeader> responses, long currentTime);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HDRS.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HDRS.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HDRS.java
deleted file mode 100644
index 6e719eb..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HDRS.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: HDRS.java,v 1.2 2004/03/30 06:47:21 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.stack.Protocol;
-
-
-/**
- * Example of a protocol layer. Contains no real functionality, can be used as a template.
- */
-public class HDRS extends Protocol {
- @Override // GemStoneAddition
- public String getName() {return "HDRS";}
-
-
- private void printMessage(Message msg, String label) {
- System.out.println("------------------------- " + label + " ----------------------");
- System.out.println(msg);
- msg.printObjectHeaders();
- System.out.println("--------------------------------------------------------------");
- }
-
-
- @Override // GemStoneAddition
- public void up(Event evt) {
- if(evt.getType() == Event.MSG) {
- Message msg=(Message)evt.getArg();
- printMessage(msg, "up");
- }
- passUp(evt); // Pass up to the layer above us
- }
-
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
- if(evt.getType() == Event.MSG) {
- Message msg=(Message)evt.getArg();
- printMessage(msg, "down");
- }
-
- passDown(evt); // Pass on to the layer below us
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HTOTAL.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HTOTAL.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HTOTAL.java
deleted file mode 100644
index e1583a1..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/HTOTAL.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: HTOTAL.java,v 1.4 2005/09/01 11:41:00 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Streamable;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.*;
-import java.util.Properties;
-import java.util.Vector;
-
-
-/**
- * Implementation of UTO-TCP as designed by EPFL. Implements chaining algorithm: each sender sends the message
- * to a coordinator who then forwards it to its neighbor on the right, who then forwards it to its neighbor to the right
- * etc.
- * @author Bela Ban
- * @version $Id: HTOTAL.java,v 1.4 2005/09/01 11:41:00 belaban Exp $
- */
-public class HTOTAL extends Protocol {
- Address coord=null;
- Address neighbor=null; // to whom do we forward the message (member to the right, or null if we're at the tail)
- Address local_addr=null;
- Vector mbrs=new Vector();
-// boolean is_coord=false; GemStoneAddition
- private boolean use_multipoint_forwarding=false;
-
-
-
-
- public HTOTAL() {
- }
-
- @Override // GemStoneAddition
- public final String getName() {
- return "HTOTAL";
- }
-
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- String str;
-
- super.setProperties(props);
- str=props.getProperty("use_multipoint_forwarding");
- if(str != null) {
- use_multipoint_forwarding=Boolean.valueOf(str).booleanValue();
- props.remove("use_multipoint_forwarding");
- }
-
- if(props.size() > 0) {
- log.error(ExternalStrings.HTOTAL_TCPSETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
- return false;
- }
- return true;
- }
-
- @Override // GemStoneAddition
- public void down(Event evt) {
- switch(evt.getType()) {
- case Event.VIEW_CHANGE:
- determineCoordinatorAndNextMember((View)evt.getArg());
- break;
- case Event.MSG:
- Message msg=(Message)evt.getArg();
- Address dest=msg.getDest();
- if(dest == null || dest.isMulticastAddress()) { // only process multipoint messages
- if(coord == null)
- log.error(ExternalStrings.HTOTAL_COORDINATOR_IS_NULL_CANNOT_SEND_MESSAGE_TO_COORDINATOR);
- else {
- msg.setSrc(local_addr);
- forwardTo(coord, msg);
- }
- return; // handled here, don't pass down by default
- }
- break;
- }
- passDown(evt);
- }
-
- @Override // GemStoneAddition
- public void up(Event evt) {
- switch(evt.getType()) {
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address)evt.getArg();
- break;
- case Event.VIEW_CHANGE:
- determineCoordinatorAndNextMember((View)evt.getArg());
- break;
- case Event.MSG:
- Message msg=(Message)evt.getArg();
- HTotalHeader hdr=(HTotalHeader)msg.getHeader(getName());
-
- if(hdr == null)
- break; // probably a unicast message, just pass it up
-
- Message copy=msg.copy(false); // do not copy the buffer
- if(use_multipoint_forwarding) {
- copy.setDest(null);
- passDown(new Event(Event.MSG, copy));
- }
- else {
- if(neighbor != null) {
- forwardTo(neighbor, copy);
- }
- }
-
- msg.setDest(hdr.dest); // set destination to be the original destination
- msg.setSrc(hdr.src); // set sender to be the original sender (important for retransmission requests)
-
- passUp(evt); // <-- we modify msg directly inside evt
- return;
- }
- passUp(evt);
- }
-
- private void forwardTo(Address destination, Message msg) {
- HTotalHeader hdr=(HTotalHeader)msg.getHeader(getName());
-
- if(hdr == null) {
- hdr=new HTotalHeader(msg.getDest(), msg.getSrc());
- msg.putHeader(getName(), hdr);
- }
- msg.setDest(destination);
- if(trace)
- log.trace("forwarding message to " + destination + ", hdr=" + hdr);
- passDown(new Event(Event.MSG, msg));
- }
-
-
- private void determineCoordinatorAndNextMember(View v) {
- Object tmp;
- Address retval=null;
-
- mbrs.clear();
- mbrs.addAll(v.getMembers());
-
- coord=(Address)(/* mbrs != null && GemStoneAddition (cannot be null) */ mbrs.size() > 0? mbrs.firstElement() : null);
-// is_coord=coord != null && local_addr != null && coord.equals(local_addr); GemStoneAddition
-
- if(/* mbrs == null || GemStoneAddition (cannot be null) */ mbrs.size() < 2 || local_addr == null)
- neighbor=null;
- else {
- for(int i=0; i < mbrs.size(); i++) {
- tmp=mbrs.elementAt(i);
- if(local_addr.equals(tmp)) {
- if(i + 1 >= mbrs.size()) {
-// retval=null; // we don't wrap, last member is null GemStoneAddition (redundant assignment)
- }
- else
- retval=(Address)mbrs.elementAt(i + 1);
- break;
- }
- }
- }
- neighbor=retval;
- if(trace)
- log.trace("coord=" + coord + ", neighbor=" + neighbor);
- }
-
-
- public static class HTotalHeader extends Header implements Streamable {
- Address dest, src;
-
- public HTotalHeader() {
- }
-
- public HTotalHeader(Address dest, Address src) {
- this.dest=dest;
- this.src=src;
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(dest);
- out.writeObject(src);
- }
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- dest=(Address)in.readObject();
- src=(Address)in.readObject();
- }
-
- public void writeTo(DataOutputStream out) throws IOException {
- Util.writeAddress(dest, out);
- Util.writeAddress(src, out);
- }
-
- public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
- dest=Util.readAddress(in);
- src=Util.readAddress(in);
- }
-
- @Override // GemStoneAddition
- public String toString() {
- return "dest=" + dest + ", src=" + src;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOOPBACK.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOOPBACK.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOOPBACK.java
deleted file mode 100644
index 407ddeb..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOOPBACK.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: LOOPBACK.java,v 1.16 2005/08/26 12:26:33 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.Util;
-
-
-/**
- Makes copies of outgoing messages, swaps sender and receiver and sends the message back up the stack.
- */
-public class LOOPBACK extends Protocol {
- private Address local_addr=null;
- private String group_addr=null;
-
- public LOOPBACK() {
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- return "Protocol LOOPBACK(local address: " + local_addr + ')';
- }
-
-
-
-
- /*------------------------------ Protocol interface ------------------------------ */
-
- @Override // GemStoneAddition
- public String getName() {
- return "LOOPBACK";
- }
-
-
-
- @Override // GemStoneAddition
- public void init() throws Exception {
-// local_addr=new IpAddress("localhost", 10000) { // fake address
-// public String toString() {
-// return "<fake>";
-// }
-// };
-
- //local_addr=new org.jgroups.stack.IpAddress("localhost", 10000); // fake address
- local_addr = new IpAddress(12345);
- }
-
- @Override // GemStoneAddition
- public void start() throws Exception {
- passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
- }
-
-
- /**
- * Caller by the layer above this layer. Usually we just put this Message
- * into the send queue and let one or more worker threads handle it. A worker thread
- * then removes the Message from the send queue, performs a conversion and adds the
- * modified Message to the send queue of the layer below it, by calling Down).
- */
- @Override // GemStoneAddition
- public void down(Event evt) {
- if(trace)
- log.trace("event is " + evt + ", group_addr=" + group_addr +
- ", time is " + System.currentTimeMillis() + ", hdrs: " + Util.printEvent(evt));
-
- switch(evt.getType()) {
-
- case Event.MSG:
- Message msg=(Message)evt.getArg();
- Message rsp=msg.copy();
- if(rsp.getSrc() == null)
- rsp.setSrc(local_addr);
-
- //dest_addr=msg.getDest();
- //rsp.setDest(local_addr);
- //rsp.setSrc(dest_addr != null ? dest_addr : local_addr);
- up(new Event(Event.MSG, rsp));
- break;
-
- case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local)
- passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
- break;
-
- case Event.CONNECT:
- group_addr=(String)evt.getArg();
- passUp(new Event(Event.CONNECT_OK));
- break;
-
- case Event.DISCONNECT:
- passUp(new Event(Event.DISCONNECT_OK));
- break;
-
- case Event.PERF:
- passUp(evt);
- break;
- }
- }
-
-
-
- /*--------------------------- End of Protocol interface -------------------------- */
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOSS.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOSS.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOSS.java
deleted file mode 100644
index 8fc8f44..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/LOSS.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: LOSS.java,v 1.3 2004/09/23 16:29:41 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.View;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.util.Vector;
-
-
-/**
- * Example of a protocol layer. Contains no real functionality, can be used as a template.
- */
-
-public class LOSS extends Protocol {
- final Vector members=new Vector();
- static/*GemStoneAddition*/ final long i=0;
- boolean drop_next_msg=false;
-
- /** All protocol names have to be unique ! */
- @Override // GemStoneAddition
- public String getName() {return "LOSS";}
-
-
-
- /** Just remove if you don't need to reset any state */
- public void reset() {}
-
-
-
-
-// public void up(Event evt) {
-// Message msg;
-
-// switch(evt.getType()) {
-
-// case Event.MSG:
-// msg=(Message)evt.getArg();
-// if(msg.getDest() != null && !((Address)msg.getDest()).isMulticastAddress()) {
-// // System.err.println("LOSS.up(): not dropping msg as it is unicast !");
-// break;
-// }
-
-// i++;
-
-// int r=((int)(Math.random() * 1000)) % 10;
-
-// if(r != 0 && i % r == 0) { // drop
-// System.out.println("####### LOSS.up(): dropping message " +
-// Util.printEvent(evt));
-// return;
-// }
-
-// break;
-// }
-
-// passUp(evt); // Pass up to the layer above us
-// }
-
-
-
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
- Message msg;
-
- switch(evt.getType()) {
-
- case Event.TMP_VIEW:
- case Event.VIEW_CHANGE:
- Vector new_members=((View)evt.getArg()).getMembers();
- synchronized(members) {
- members.removeAllElements();
- if(new_members != null && new_members.size() > 0)
- for(int i=0; i < new_members.size(); i++)
- members.addElement(new_members.elementAt(i));
- }
- passDown(evt);
- break;
-
- case Event.MSG:
- if(drop_next_msg) {
- drop_next_msg=false;
- msg=(Message)evt.getArg();
-
- if(msg.getDest() != null && !msg.getDest().isMulticastAddress()) {
- break;
- }
-
-
- System.out.println("###### LOSS.down(): dropping msg " + Util.printMessage(msg));
-
- return;
- }
- break;
-
- case Event.DROP_NEXT_MSG:
- drop_next_msg=true;
- break;
- }
-
-
-
- passDown(evt); // Pass on to the layer below us
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE.java
deleted file mode 100644
index 7d2b6d7..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: MERGE.java,v 1.10 2005/08/11 12:43:47 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.stack.RouterStub;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.List;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Enumeration;
-import java.util.Properties;
-import java.util.Vector;
-
-
-/**
- * Simple and stupid MERGE protocol (does not take into account state transfer).
- * Periodically mcasts a HELLO message with its own address. When a HELLO message is
- * received from a member that has the same group (UDP discards all messages with a group
- * name different that our own), but is not currently in the group, a MERGE event is sent
- * up the stack. The protocol starts working upon receiving a View in which it is the coordinator.
- *
- * @author Gianluca Collot, Jan 2001
- */
-public class MERGE extends Protocol implements Runnable {
- final Vector members=new Vector();
- Address local_addr=null;
- String group_addr=null;
- final String groupname=null;
-
- // GemStoneAddition: access hello_thread synchronized on this
- Thread hello_thread=null; // thread that periodically mcasts HELLO messages
- long timeout=5000; // timeout between mcasting of HELLO messages
-
- String router_host=null;
- int router_port=0;
-
- RouterStub client=null;
- boolean is_server=false;
- boolean is_coord=false;
- boolean merging=false;
-
-
- @Override // GemStoneAddition
- public String getName() {
- return "MERGE";
- }
-
-
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- String str;
-
- super.setProperties(props);
- str=props.getProperty("timeout"); // max time to wait for initial members
- if(str != null) {
- timeout=Long.parseLong(str);
- props.remove("timeout");
- }
-
- str=props.getProperty("router_host"); // host to send gossip queries (if gossip enabled)
- if(str != null) {
- router_host=str;
- props.remove("router_host");
- }
-
- str=props.getProperty("router_port");
- if(str != null) {
- router_port=Integer.parseInt(str);
- props.remove("router_port");
- }
-
- if(router_host != null && router_port != 0)
- client=new RouterStub(router_host, router_port);
-
- if(props.size() > 0) {
- log.error(ExternalStrings.MERGE_MERGESETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
- return false;
- }
- return true;
- }
-
-
- @Override // GemStoneAddition
- public void start() throws Exception {
- synchronized (this) { // GemStoneAddition
- if(hello_thread == null) {
- hello_thread=new Thread(this, "MERGE Thread");
- hello_thread.setDaemon(true);
- hello_thread.start();
- }
- }
- }
-
-
- @Override // GemStoneAddition
- public void stop() {
- Thread tmp;
- synchronized (this) { // GemStoneAddition
- tmp = hello_thread;
- hello_thread = null;
- }
- if(tmp != null && tmp.isAlive()) {
-// tmp=hello_thread;
-// hello_thread=null;
- tmp.interrupt();
- try {
- tmp.join(1000);
- }
- catch(InterruptedException ex) {
- Thread.currentThread().interrupt(); // GemStoneAddition
- }
- }
-// hello_thread=null; GemStoneAddition
- }
-
-
- @Override // GemStoneAddition
- public void up(Event evt) {
- Message msg;
- Object obj;
- MergeHeader hdr;
- Address sender;
- boolean contains;
- Vector tmp;
-
-
- switch(evt.getType()) {
-
- case Event.MSG:
- msg=(Message)evt.getArg();
- obj=msg.getHeader(getName());
- if(obj == null || !(obj instanceof MergeHeader)) {
- passUp(evt);
- return;
- }
- hdr=(MergeHeader)msg.removeHeader(getName());
-
- switch(hdr.type) {
-
- case MergeHeader.HELLO: // if coord: handle, else: discard
- if(!is_server || !is_coord) {
- return;
- }
- if(merging) {
- return;
- }
- sender=msg.getSrc();
- if((sender != null) && (members.size() >= 0)) {
- synchronized(members) {
- contains=members.contains(sender);
- }
- //merge only with lower addresses :prevents cycles and ensures that the new coordinator is correct.
- if(!contains && sender.compareTo(local_addr) < 0) {
- if(log.isInfoEnabled())
- log.info("membership " + members +
- " does not contain " + sender + "; merging it");
- tmp=new Vector();
- tmp.addElement(sender);
- merging=true;
- passUp(new Event(Event.MERGE, tmp));
- }
- }
- return;
-
- default:
- if(log.isErrorEnabled()) log.error(ExternalStrings.MERGE_GOT_MERGE_HDR_WITH_UNKNOWN_TYPE_0, hdr.type);
- return;
- }
-
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address)evt.getArg();
- passUp(evt);
- break;
-
- default:
- passUp(evt); // Pass up to the layer above us
- break;
- }
- }
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
-
- switch(evt.getType()) {
-
- case Event.TMP_VIEW:
- passDown(evt);
- break;
-
- case Event.MERGE_DENIED:
- merging=false;
- passDown(evt);
- break;
-
- case Event.VIEW_CHANGE:
- merging=false;
- synchronized(members) {
- members.clear();
- members.addAll(((View)evt.getArg()).getMembers());
- if(/* (members == null) || GemStoneAddition (cannot be null) */ (members.size() == 0)) {
- if(log.isFatalEnabled()) log.fatal("received VIEW_CHANGE with null or empty vector");
- //System.exit(6);
- }
- }
- is_coord=members.elementAt(0).equals(local_addr);
- passDown(evt);
- if(is_coord) {
- if(log.isInfoEnabled()) log.info(ExternalStrings.MERGE_START_SENDING_HELLOS);
- try {
- start();
- }
- catch(Exception ex) {
- if(warn) log.warn("exception calling start(): " + ex);
- }
- }
- else {
- if(log.isInfoEnabled()) log.info(ExternalStrings.MERGE_STOP_SENDING_HELLOS);
- stop();
- }
- break;
-
- case Event.BECOME_SERVER: // called after client has join and is fully working group member
- passDown(evt);
- try {
- start();
- is_server=true;
- }
- catch(Exception ex) {
- if(warn) log.warn("exception calling start(): " + ex);
- }
- break;
-
- case Event.CONNECT:
- group_addr=(String)evt.getArg();
- passDown(evt);
- break;
-
- case Event.DISCONNECT:
- if(local_addr != null && evt.getArg() != null && local_addr.equals(evt.getArg()))
- stop();
- passDown(evt);
- break;
-
- default:
- passDown(evt); // Pass on to the layer below us
- break;
- }
- }
-
-
- /**
- * If IP multicast: periodically mcast a HELLO message
- * If gossiping: periodically retrieve the membership. Any members not part of our
- * own membership are merged (passing MERGE event up).
- */
- public void run() {
- Message hello_msg;
- MergeHeader hdr;
- List rsps;
- Vector members_to_merge=new Vector(), tmp;
- Object mbr;
-
-
- try {
- Thread.sleep(3000);
- } /// initial sleep; no premature merging
- catch (InterruptedException e) { // GemStoneAddition
- return; // exit thread, no need to reset interrupt
- }
-
-
- for (;;) { // GemStoneAddition remove coding anti-pattern
- try { // GemStoneAddition
- Util.sleep(timeout);
- }
- catch (InterruptedException e) {
- break; // exit loop and thread, no need to reset interrupt
- }
-// if(hello_thread == null) break; GemStoneAddition
- if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition - for safety
-
- if(client == null) { // plain IP MCAST
- hello_msg=new Message(null, null, null);
- hdr=new MergeHeader(MergeHeader.HELLO);
- hello_msg.putHeader(getName(), hdr);
- passDown(new Event(Event.MSG, hello_msg));
- }
- else { // gossiping; contact Router
- rsps=client.get(group_addr);
-
- synchronized(members) {
- members_to_merge.removeAllElements();
-
- for(Enumeration e=rsps.elements(); e.hasMoreElements();) {
- mbr=e.nextElement();
- if(!members.contains(mbr)) {
-
- if(log.isInfoEnabled())
- log.info("membership " + members +
- " does not contain " + mbr + "; merging it");
-
- members_to_merge.addElement(mbr);
- }
- }
- if(members_to_merge.size() > 0) {
- Membership new_membership=new Membership(members_to_merge);
- new_membership.sort();
- Address coord=(Address)new_membership.elementAt(0);
- tmp=new Vector();
- tmp.addElement(coord);
- if(coord.compareTo(local_addr) < 0)
- passUp(new Event(Event.MERGE, tmp));
- }
- }
- }
- }
- }
-
-
-
-
-
-
- /* -------------------------- Private methods ---------------------------- */
-
-
- public static class MergeHeader extends Header {
- public static final int HELLO=1; // arg = null
-
- public int type=0;
-
- public MergeHeader() {
- } // used for externalization
-
- public MergeHeader(int type) {
- this.type=type;
- }
-
- @Override // GemStoneAddition
- public String toString() {
- return "[MERGE: type=" + type2Str(type) + ']';
- }
-
- String type2Str(int t) {
- switch(t) {
- case HELLO:
- return "HELLO";
- default:
- return "<unkown type (" + t + ")>";
- }
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(type);
- }
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- type=in.readInt();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE2.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE2.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE2.java
deleted file mode 100644
index dd37b68..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE2.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: MERGE2.java,v 1.25 2005/10/04 15:47:47 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.View;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Promise;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.util.Properties;
-import java.util.Vector;
-
-
-
-
-/**
- * Protocol to discover subgroups; e.g., existing due to a network partition (that healed). Example: group
- * {p,q,r,s,t,u,v,w} is split into 3 subgroups {p,q}, {r,s,t,u} and {v,w}. This protocol will eventually send
- * a MERGE event with the coordinators of each subgroup up the stack: {p,r,v}. Note that - depending on the time
- * of subgroup discovery - there could also be 2 MERGE events, which first join 2 of the subgroups, and then the
- * resulting group to the last subgroup. The real work of merging the subgroups into one larger group is done
- * somewhere above this protocol (typically in the GMS protocol).<p>
- * This protocol works as follows:
- * <ul>
- * <li>If coordinator: periodically retrieve the initial membership (using the FIND_INITIAL_MBRS event provided e.g.
- * by PING or TCPPING protocols. This list contains {coord,addr} pairs.
- * <li>If there is more than 1 coordinator:
- * <ol>
- * <li>Get all coordinators
- * <li>Create a MERGE event with the list of coordinators as argument
- * <li>Send the event up the stack
- * </ol>
- * </ul>
- *
- * <p>
- *
- * Requires: FIND_INITIAL_MBRS event from below<br>
- * Provides: sends MERGE event with list of coordinators up the stack<br>
- * @author Bela Ban, Oct 16 2001
- */
-public class MERGE2 extends Protocol {
- Address local_addr=null;
- FindSubgroups task=null; // task periodically executing as long as we are coordinator
- private final Object task_lock=new Object();
- long min_interval=5000; // minimum time between executions of the FindSubgroups task
- long max_interval=20000; // maximum time between executions of the FindSubgroups task
- boolean is_coord=false;
- final Promise find_promise=new Promise(); // to synchronize FindSubgroups.findInitialMembers() on
-
- /** Use a new thread to send the MERGE event up the stack */
- boolean use_separate_thread=false;
-
-
- @Override // GemStoneAddition
- public String getName() {
- return "MERGE2";
- }
-
- public long getMinInterval() {
- return min_interval;
- }
-
- public void setMinInterval(long i) {
- min_interval=i;
- }
-
- public long getMaxInterval() {
- return max_interval;
- }
-
- public void setMaxInterval(long l) {
- max_interval=l;
- }
-
-
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- String str;
-
- super.setProperties(props);
- str=props.getProperty("min_interval");
- if(str != null) {
- min_interval=Long.parseLong(str);
- props.remove("min_interval");
- }
-
- str=props.getProperty("max_interval");
- if(str != null) {
- max_interval=Long.parseLong(str);
- props.remove("max_interval");
- }
-
- if(min_interval <= 0 || max_interval <= 0) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.MERGE2_MIN_INTERVAL_AND_MAX_INTERVAL_HAVE_TO_BE__0);
- return false;
- }
- if(max_interval <= min_interval) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.MERGE2_MAX_INTERVAL_HAS_TO_BE_GREATER_THAN_MIN_INTERVAL);
- return false;
- }
-
- str=props.getProperty("use_separate_thread");
- if(str != null) {
- use_separate_thread=Boolean.valueOf(str).booleanValue();
- props.remove("use_separate_thread");
- }
-
- if(props.size() > 0) {
- log.error(ExternalStrings.MERGE2_MERGE2SETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
- return false;
- }
- return true;
- }
-
-
- @Override // GemStoneAddition
- public Vector requiredDownServices() {
- Vector retval=new Vector(1);
- retval.addElement(Integer.valueOf(Event.FIND_INITIAL_MBRS));
- return retval;
- }
-
-
- @Override // GemStoneAddition
- public void stop() {
- is_coord=false;
- stopTask();
- }
-
-
- /**
- * This prevents the up-handler thread to be created, which is not needed in the protocol.
- * DON'T REMOVE !
- */
- @Override // GemStoneAddition
- public void startUpHandler() {
- }
-
-
- /**
- * This prevents the down-handler thread to be created, which is not needed in the protocol.
- * DON'T REMOVE !
- */
- @Override // GemStoneAddition
- public void startDownHandler() {
- }
-
-
- @Override // GemStoneAddition
- public void up(Event evt) {
- switch(evt.getType()) {
-
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address)evt.getArg();
- passUp(evt);
- break;
-
- case Event.FIND_INITIAL_MBRS_OK:
- find_promise.setResult(evt.getArg());
- passUp(evt); // could be needed by GMS
- break;
-
- default:
- passUp(evt); // Pass up to the layer above us
- break;
- }
- }
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
- Vector mbrs;
- Address coord;
-
- switch(evt.getType()) {
-
- case Event.VIEW_CHANGE:
- passDown(evt);
- mbrs=((View)evt.getArg()).getMembers();
- if(mbrs == null || mbrs.size() == 0 || local_addr == null) {
- stopTask();
- break;
- }
- coord=(Address)mbrs.elementAt(0);
- if(coord.equals(local_addr)) {
- is_coord=true;
- startTask(); // start task if we became coordinator (doesn't start if already running)
- }
- else {
- // if we were coordinator, but are no longer, stop task. this happens e.g. when we merge and someone
- // else becomes the new coordinator of the merged group
- if(is_coord) {
- is_coord=false;
- }
- stopTask();
- }
- break;
-
- default:
- passDown(evt); // Pass on to the layer below us
- break;
- }
- }
-
-
- /* -------------------------------------- Private Methods --------------------------------------- */
- void startTask() {
- synchronized(task_lock) {
- if(task == null)
- task=new FindSubgroups();
- task.start();
- }
- }
-
- void stopTask() {
- synchronized(task_lock) {
- if(task != null) {
- task.stop(); // will cause timer to remove task from execution schedule
- task=null;
- }
- }
- }
- /* ---------------------------------- End of Private Methods ------------------------------------ */
-
-
-
-
- /**
- * Task periodically executing (if role is coordinator). Gets the initial membership and determines
- * whether there are subgroups (multiple coordinators for the same group). If yes, it sends a MERGE event
- * with the list of the coordinators up the stack
- */
- protected/*GemStoneAddition*/ class FindSubgroups implements Runnable {
- // GemStoneAddition: #thread must be synchronized on this
- Thread thread=null;
-
-
- public synchronized /* GemStoneAddition */ void start() {
- if(thread == null || !thread.isAlive()) {
- thread=new Thread(this, "MERGE2.FindSubgroups thread");
- thread.setDaemon(true);
- thread.start();
- }
- }
-
-
- public synchronized /* GemStoneAddition */ void stop() {
- if(thread != null) {
- Thread tmp=thread;
- thread=null;
- tmp.interrupt(); // wakes up sleeping thread
- find_promise.reset();
- }
- thread=null;
- }
-
-
- public void run() {
- long interval;
- Vector coords;
- Vector initial_mbrs;
-
- // if(log.isDebugEnabled()) log.debug("merge task started as I'm the coordinator");
- for (;;) { // GemStoneAddition remove coding anti-pattern
- interval=computeInterval();
- try { // GemStoneAddition
- Util.sleep(interval);
- }
- catch (InterruptedException e) {
- break; // exits thread
- }
-// if(thread == null) break; GemStoneAddition
- if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
- initial_mbrs=findInitialMembers();
-// if(thread == null) break; GemStoneAddition
- if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
- if(log.isDebugEnabled()) log.debug("initial_mbrs=" + initial_mbrs);
- coords=detectMultipleCoordinators(initial_mbrs);
- if(coords != null && coords.size() > 1) {
- if(log.isDebugEnabled())
- log.debug("found multiple coordinators: " + coords + "; sending up MERGE event");
- final Event evt=new Event(Event.MERGE, coords);
- if(use_separate_thread) {
- Thread merge_notifier=new Thread() {
- @Override // GemStoneAddition
- public void run() {
- passUp(evt);
- }
- };
- merge_notifier.setDaemon(true);
- merge_notifier.setName("merge notifier thread");
- merge_notifier.start();
- }
- else {
- passUp(evt);
- }
- }
- }
- if(trace)
- log.trace("MERGE2.FindSubgroups thread terminated (local_addr=" + local_addr + ")");
- }
-
-
- /**
- * Returns a random value within [min_interval - max_interval]
- */
- long computeInterval() {
- return min_interval + Util.random(max_interval - min_interval);
- }
-
-
- /**
- * Returns a list of PingRsp pairs.
- */
- Vector findInitialMembers() {
- PingRsp tmp=new PingRsp(local_addr, local_addr, true);
- find_promise.reset();
- passDown(Event.FIND_INITIAL_MBRS_EVT);
- Vector retval=(Vector)find_promise.getResult(0); // wait indefinitely until response is received
- if(retval != null && is_coord && local_addr != null && !retval.contains(tmp))
- retval.add(tmp);
- return retval;
- }
-
-
- /**
- * Finds out if there is more than 1 coordinator in the initial_mbrs vector (contains PingRsp elements).
- * @param initial_mbrs A list of PingRsp pairs
- * @return Vector A list of the coordinators (Addresses) found. Will contain just 1 element for a correct
- * membership, and more than 1 for multiple coordinators
- */
- Vector detectMultipleCoordinators(Vector initial_mbrs) {
- Vector ret=new Vector(11);
- PingRsp rsp;
- Address coord;
-
- if(initial_mbrs == null) return null;
- for(int i=0; i < initial_mbrs.size(); i++) {
- rsp=(PingRsp)initial_mbrs.elementAt(i);
- if(!rsp.is_server)
- continue;
- coord=rsp.getCoordAddress();
- if(!ret.contains(coord))
- ret.addElement(coord);
- }
-
- return ret;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE3.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE3.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE3.java
deleted file mode 100644
index b71da43..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGE3.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: MERGE3.java,v 1.8 2005/08/08 12:45:43 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.TimeScheduler;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.*;
-
-
-
-
-/**
- * Protocol to discover subgroups; e.g., existing due to a network partition (that healed). Example: group
- * {p,q,r,s,t,u,v,w} is split into 3 subgroups {p,q}, {r,s,t,u} and {v,w}. This protocol will eventually send
- * a MERGE event with the coordinators of each subgroup up the stack: {p,r,v}. Note that - depending on the time
- * of subgroup discovery - there could also be 2 MERGE events, which first join 2 of the subgroups, and then the
- * resulting group to the last subgroup. The real work of merging the subgroups into one larger group is done
- * somewhere above this protocol (typically in the GMS protocol).<p>
- * This protocol works as follows:
- * <ul>
- * <li>If coordinator: periodically broadcast a "I'm the coordinator" message. If a coordinator receives such
- * a message, it immediately initiates a merge by sending up a MERGE event
- * <p>
- *
- * Provides: sends MERGE event with list of coordinators up the stack<br>
- * @author Bela Ban, Oct 16 2001
- */
-public class MERGE3 extends Protocol {
- Address local_addr=null;
- long min_interval=5000; // minimum time between executions of the FindSubgroups task
- long max_interval=20000; // maximum time between executions of the FindSubgroups task
- boolean is_coord=false;
- final Vector mbrs=new Vector();
- TimeScheduler timer=null;
- CoordinatorAnnouncer announcer_task=null;
- final Set announcements=Collections.synchronizedSet(new HashSet());
-
- /** Use a new thread to send the MERGE event up the stack */
- boolean use_separate_thread=false;
-
-
-
-
- @Override // GemStoneAddition
- public String getName() {
- return "MERGE3";
- }
-
-
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- String str;
-
- super.setProperties(props);
- str=props.getProperty("min_interval");
- if(str != null) {
- min_interval=Long.parseLong(str);
- props.remove("min_interval");
- }
-
- str=props.getProperty("max_interval");
- if(str != null) {
- max_interval=Long.parseLong(str);
- props.remove("max_interval");
- }
-
- if(min_interval <= 0 || max_interval <= 0) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.MERGE3_MIN_INTERVAL_AND_MAX_INTERVAL_HAVE_TO_BE__0);
- return false;
- }
- if(max_interval <= min_interval) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.MERGE3_MAX_INTERVAL_HAS_TO_BE_GREATER_THAN_MIN_INTERVAL);
- return false;
- }
-
- str=props.getProperty("use_separate_thread");
- if(str != null) {
- use_separate_thread=Boolean.valueOf(str).booleanValue();
- props.remove("use_separate_thread");
- }
-
- if(props.size() > 0) {
- log.error(ExternalStrings.MERGE3_MERGE2SETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
- return false;
- }
- return true;
- }
-
- @Override // GemStoneAddition
- public void init() throws Exception {
- timer=stack.timer;
- }
-
-
- /**
- * This prevents the up-handler thread to be created, which is not needed in the protocol.
- * DON'T REMOVE !
- */
- @Override // GemStoneAddition
- public void startUpHandler() {
- }
-
-
- /**
- * This prevents the down-handler thread to be created, which is not needed in the protocol.
- * DON'T REMOVE !
- */
- @Override // GemStoneAddition
- public void startDownHandler() {
- }
-
-
- @Override // GemStoneAddition
- public void up(Event evt) {
- switch(evt.getType()) {
-
- case Event.MSG:
- Message msg=(Message)evt.getArg();
- CoordAnnouncement hdr=(CoordAnnouncement)msg.removeHeader(getName());
- if(hdr != null) {
- if(hdr.coord_addr != null && is_coord) {
- boolean contains;
- contains=announcements.contains(hdr.coord_addr);
- announcements.add(hdr.coord_addr);
- if(log.isDebugEnabled()) {
- if(contains)
- log.debug("discarded duplicate announcement: " + hdr.coord_addr +
- ", announcements=" + announcements);
- else
- log.debug("received announcement: " + hdr.coord_addr + ", announcements=" + announcements);
- }
-
- if(announcements.size() > 1 && is_coord) {
- processAnnouncements();
- }
- }
- }
- else
- passUp(evt);
- break;
-
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address)evt.getArg();
- passUp(evt);
- break;
-
- default:
- passUp(evt); // Pass up to the layer above us
- break;
- }
- }
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
- Vector tmp;
- Address coord;
-
- switch(evt.getType()) {
-
- case Event.VIEW_CHANGE:
- passDown(evt);
- tmp=((View)evt.getArg()).getMembers();
- mbrs.clear();
- mbrs.addAll(tmp);
- coord=(Address)mbrs.elementAt(0);
- if(coord.equals(local_addr)) {
- if(is_coord == false) {
- is_coord=true;
- startCoordAnnouncerTask();
- }
- }
- else {
- if(is_coord == true) {
- is_coord=false;
- stopCoordAnnouncerTask();
- }
- }
- break;
-
- default:
- passDown(evt); // Pass on to the layer below us
- break;
- }
- }
-
-
- void startCoordAnnouncerTask() {
- if(announcer_task == null) {
- announcements.add(local_addr);
- announcer_task=new CoordinatorAnnouncer();
- timer.add(announcer_task);
- if(log.isDebugEnabled())
- log.debug("coordinator announcement task started, announcements=" + announcements);
- }
- }
-
- void stopCoordAnnouncerTask() {
- if(announcer_task != null) {
- announcer_task.stop();
- announcer_task=null;
- announcements.clear();
- if(log.isDebugEnabled())
- log.debug("coordinator announcement task stopped");
- }
- }
-
-
-
- /**
- * Returns a random value within [min_interval - max_interval]
- */
- long computeInterval() {
- return min_interval + Util.random(max_interval - min_interval);
- }
-
-
-
- void sendCoordinatorAnnouncement(Address coord) {
- Message coord_announcement=new Message(); // multicast to all
- CoordAnnouncement hdr=new CoordAnnouncement(coord);
- coord_announcement.putHeader(getName(), hdr);
- passDown(new Event(Event.MSG, coord_announcement));
- }
-
- void processAnnouncements() {
- if(announcements.size() > 1) {
- Vector coords=new Vector(announcements); // create a clone
- if(coords.size() > 1) {
- if(log.isDebugEnabled())
- log.debug("passing up MERGE event, coords=" + coords);
- final Event evt=new Event(Event.MERGE, coords);
- if(use_separate_thread) {
- Thread merge_notifier=new Thread() {
- @Override // GemStoneAddition
- public void run() {
- passUp(evt);
- }
- };
- merge_notifier.setDaemon(true);
- merge_notifier.setName("merge notifier thread");
- }
- else {
- passUp(evt);
- }
- }
- announcements.clear();
- }
- }
-
-
- class CoordinatorAnnouncer implements TimeScheduler.Task {
- boolean cancelled=false;
-
- public void start() {
- cancelled=false;
- }
-
- public void stop() {
- cancelled=true;
- }
-
- public boolean cancelled() {
- return cancelled;
- }
-
- public long nextInterval() {
- return computeInterval();
- }
-
- public void run() {
- if(is_coord)
- sendCoordinatorAnnouncement(local_addr);
- }
- }
-
-
-
- public static class CoordAnnouncement extends Header {
- Address coord_addr=null;
-
- public CoordAnnouncement() {
- }
-
- public CoordAnnouncement(Address coord) {
- this.coord_addr=coord;
- }
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- coord_addr=(Address)in.readObject();
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(coord_addr);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGEFAST.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGEFAST.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGEFAST.java
deleted file mode 100644
index 6f98867..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MERGEFAST.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.Protocol;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Vector;
-
-/**
- * The coordinator attaches a small header to each (or every nth) message. If another coordinator <em>in the
- * same group</em> sees the message, it will initiate the merge protocol immediately by sending a MERGE
- * event up the stack.
- * @author Bela Ban, Aug 25 2003
- */
-public class MERGEFAST extends Protocol {
- Address local_addr=null;
- boolean is_coord=false;
- static final String name="MERGEFAST";
-
- @Override // GemStoneAddition
- public String getName() {
- return name;
- }
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
- if(is_coord == true && evt.getType() == Event.MSG && local_addr != null) {
- Message msg=(Message)evt.getArg();
- Address dest=msg.getDest();
- if(dest == null || dest.isMulticastAddress()) {
- msg.putHeader(getName(), new MergefastHeader(local_addr));
- }
- }
-
- if(evt.getType() == Event.VIEW_CHANGE) {
- handleViewChange((View)evt.getArg());
- }
-
- passDown(evt);
- }
-
-
-
- @Override // GemStoneAddition
- public void up(Event evt) {
- switch(evt.getType()) {
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address)evt.getArg();
- break;
- case Event.MSG:
- if(is_coord == false) // only handle message if we are coordinator
- break;
- Message msg=(Message)evt.getArg();
- MergefastHeader hdr=(MergefastHeader)msg.removeHeader(name);
- passUp(evt);
- if(hdr != null && local_addr != null) {
- Address other_coord=hdr.coord;
- if(!local_addr.equals(other_coord)) {
- sendUpMerge(new Address[]{local_addr, other_coord});
- }
- }
- return; // event was already passed up
- case Event.VIEW_CHANGE:
- handleViewChange((View)evt.getArg());
- break;
- }
- passUp(evt);
- }
-
-
- void handleViewChange(View v) {
- Vector mbrs;
- if(local_addr == null)
- return;
- mbrs=v.getMembers();
- is_coord=mbrs != null && mbrs.size() > 0 && local_addr.equals(mbrs.firstElement());
- }
-
- // @todo avoid sending up too many MERGE events.
- void sendUpMerge(Address[] addresses) {
- Vector v=new Vector(11);
- for(int i=0; i < addresses.length; i++) {
- Address addr=addresses[i];
- v.add(addr);
- }
- passUp(new Event(Event.MERGE, v));
- }
-
-
- public static class MergefastHeader extends Header {
- Address coord=null;
-
- public MergefastHeader() {
- }
-
- public MergefastHeader(Address coord) {
- this.coord=coord;
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(coord);
- }
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- coord=(Address)in.readObject();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MessageProtocolEXAMPLE.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MessageProtocolEXAMPLE.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MessageProtocolEXAMPLE.java
deleted file mode 100644
index ff206ed..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/MessageProtocolEXAMPLE.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: MessageProtocolEXAMPLE.java,v 1.2 2004/03/30 06:47:21 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-
-
-
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.stack.MessageProtocol;
-
-
-
-
-
-
-/**
-
- */
-public class MessageProtocolEXAMPLE extends MessageProtocol {
-
- @Override // GemStoneAddition
- public String getName() {return "MessageProtocolEXAMPLE";}
-
-
- /**
- <b>Callback</b>. Called when a request for this protocol layer is received.
- */
- @Override // GemStoneAddition
- public Object handle(Message req) {
- System.out.println("MessageProtocolEXAMPLE.handle(): this method should be overridden !");
- return null;
- }
-
-
-
-
- /**
- <b>Callback</b>. Called by superclass when event may be handled.<p>
- <b>Do not use <code>PassUp</code> in this method as the event is passed up
- by default by the superclass after this method returns !</b>
- @return boolean Defaults to true. If false, event will not be passed up the stack.
- */
- @Override // GemStoneAddition
- public boolean handleUpEvent(Event evt) {return true;}
-
-
- /**
- <b>Callback</b>. Called by superclass when event may be handled.<p>
- <b>Do not use <code>PassDown</code> in this method as the event is passed down
- by default by the superclass after this method returns !</b>
- @return boolean Defaults to true. If false, event will not be passed down the stack.
- */
- @Override // GemStoneAddition
- public boolean handleDownEvent(Event evt) {return true;}
-
-
-
-}