You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by ma...@apache.org on 2011/09/14 13:21:33 UTC
svn commit: r1170538 - in /incubator/ace/trunk/ace-gateway-log/src:
main/java/org/apache/ace/gateway/log/task/LogSyncTask.java
test/java/org/apache/ace/gateway/log/task/LogSyncTaskTest.java
Author: marrs
Date: Wed Sep 14 11:21:32 2011
New Revision: 1170538
URL: http://svn.apache.org/viewvc?rev=1170538&view=rev
Log:
ACE-173 more implementation changes
Modified:
incubator/ace/trunk/ace-gateway-log/src/main/java/org/apache/ace/gateway/log/task/LogSyncTask.java
incubator/ace/trunk/ace-gateway-log/src/test/java/org/apache/ace/gateway/log/task/LogSyncTaskTest.java
Modified: incubator/ace/trunk/ace-gateway-log/src/main/java/org/apache/ace/gateway/log/task/LogSyncTask.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/ace-gateway-log/src/main/java/org/apache/ace/gateway/log/task/LogSyncTask.java?rev=1170538&r1=1170537&r2=1170538&view=diff
==============================================================================
--- incubator/ace/trunk/ace-gateway-log/src/main/java/org/apache/ace/gateway/log/task/LogSyncTask.java (original)
+++ incubator/ace/trunk/ace-gateway-log/src/main/java/org/apache/ace/gateway/log/task/LogSyncTask.java Wed Sep 14 11:21:32 2011
@@ -67,9 +67,7 @@ public class LogSyncTask implements Runn
if (host == null) {
// expected if there's no discovered
// ps or relay server
- m_log.log(LogService.LOG_WARNING,
- "Unable to synchronize log with remote (endpoint="
- + m_endpoint + ") - none available");
+ m_log.log(LogService.LOG_WARNING, "Unable to synchronize log with remote (endpoint=" + m_endpoint + ") - none available");
return;
}
@@ -80,18 +78,17 @@ public class LogSyncTask implements Runn
for (int i = 0; i < logIDs.length; i++) {
Connection queryConnection = new Connection(new URL(host,
m_endpoint + "/" + COMMAND_QUERY + "?"
- + PARAMETER_GATEWAYID + "=" + gatewayID + "&"
- + PARAMETER_LOGID + "=" + logIDs[i]));
+ + PARAMETER_GATEWAYID + "=" + gatewayID + "&"
+ + PARAMETER_LOGID + "=" + logIDs[i]));
// TODO: make sure no actual call is made using sendConnection
// when there's nothing to sync
- synchronizeLog(logIDs[i], queryConnection.getInputStream(),
- sendConnection);
+ synchronizeLog(logIDs[i], queryConnection.getInputStream(), sendConnection);
}
- } catch (IOException e) {
- m_log.log(LogService.LOG_ERROR,
- "Unable to (fully) synchronize log with remote (endpoint="
- + m_endpoint + ")", e);
- } finally {
+ }
+ catch (IOException e) {
+ m_log.log(LogService.LOG_ERROR, "Unable to (fully) synchronize log with remote (endpoint=" + m_endpoint + ")", e);
+ }
+ finally {
if (sendConnection != null) {
sendConnection.close();
}
@@ -114,8 +111,7 @@ public class LogSyncTask implements Runn
* If synchronization could not be completed due to an I/O
* failure.
*/
- protected void synchronizeLog(long logID, InputStream queryInput,
- Connection sendConnection) throws IOException {
+ protected void synchronizeLog(long logID, InputStream queryInput, Connection sendConnection) throws IOException {
long highestLocal = m_LogStore.getHighestID(logID);
if (highestLocal == 0) {
// No events, no need to synchronize
@@ -126,23 +122,23 @@ public class LogSyncTask implements Runn
SortedRangeSet delta = remoteRange.diffDest(localRange);
RangeIterator rangeIterator = delta.iterator();
BufferedWriter writer = null;
- writer = new BufferedWriter(new OutputStreamWriter(
- sendConnection.getOutputStream()));
+ writer = new BufferedWriter(new OutputStreamWriter(sendConnection.getOutputStream()));
if (rangeIterator.hasNext()) {
long lowest = rangeIterator.next();
long highest = delta.getHigh();
if (lowest <= highest) {
- List events = m_LogStore.get(logID, lowest,
- highestLocal > highest ? highest : highestLocal);
+ List events = m_LogStore.get(logID, lowest, highestLocal > highest ? highest : highestLocal);
Iterator iter = events.iterator();
while (iter.hasNext()) {
LogEvent current = (LogEvent) iter.next();
- while ((current.getID() > lowest)
- && rangeIterator.hasNext()) {
+ while ((current.getID() > lowest) && rangeIterator.hasNext()) {
lowest = rangeIterator.next();
}
if (current.getID() == lowest) {
- writer.write(current.toRepresentation() + "\n");
+ // before we send the LogEvent to the other side, we fill out the
+ // appropriate identification
+ LogEvent event = new LogEvent(m_identification.getID(), current);
+ writer.write(event.toRepresentation() + "\n");
}
}
}
@@ -160,8 +156,7 @@ public class LogSyncTask implements Runn
* @throws java.io.IOException
* If no range could be determined due to an I/O failure.
*/
- protected LogDescriptor getDescriptor(InputStream queryInput)
- throws IOException {
+ protected LogDescriptor getDescriptor(InputStream queryInput) throws IOException {
BufferedReader queryReader = null;
try {
queryReader = new BufferedReader(new InputStreamReader(queryInput));
@@ -171,13 +166,11 @@ public class LogSyncTask implements Runn
return new LogDescriptor(rangeString);
}
catch (IllegalArgumentException iae) {
- throw new IOException(
- "Could not determine highest remote event id, received malformed event range ("
- + rangeString + ")");
+ throw new IOException("Could not determine highest remote event id, received malformed event range (" + rangeString + ")");
}
- } else {
- throw new IOException(
- "Could not construct LogDescriptor from stream because stream is empty");
+ }
+ else {
+ throw new IOException("Could not construct LogDescriptor from stream because stream is empty");
}
}
finally {
Modified: incubator/ace/trunk/ace-gateway-log/src/test/java/org/apache/ace/gateway/log/task/LogSyncTaskTest.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/ace-gateway-log/src/test/java/org/apache/ace/gateway/log/task/LogSyncTaskTest.java?rev=1170538&r1=1170537&r2=1170538&view=diff
==============================================================================
--- incubator/ace/trunk/ace-gateway-log/src/test/java/org/apache/ace/gateway/log/task/LogSyncTaskTest.java (original)
+++ incubator/ace/trunk/ace-gateway-log/src/test/java/org/apache/ace/gateway/log/task/LogSyncTaskTest.java Wed Sep 14 11:21:32 2011
@@ -42,20 +42,25 @@ import org.testng.annotations.Test;
public class LogSyncTaskTest {
+ private static final String GW_ID = "gwID";
private LogSyncTask m_task;
@BeforeMethod(alwaysRun = true)
protected void setUp() throws Exception {
m_task = new LogSyncTask("testlog");
TestUtils.configureObject(m_task, LogService.class);
- TestUtils.configureObject(m_task, Identification.class);
+ TestUtils.configureObject(m_task, Identification.class, new Identification() {
+ public String getID() {
+ return GW_ID;
+ }
+ });
TestUtils.configureObject(m_task, Discovery.class);
TestUtils.configureObject(m_task, LogStore.class);
}
@Test(groups = { UNIT })
public synchronized void getRange() throws Exception {
- final LogDescriptor range = new LogDescriptor("gwID", 1, new SortedRangeSet("1-10"));
+ final LogDescriptor range = new LogDescriptor(GW_ID, 1, new SortedRangeSet("1-10"));
m_task.getDescriptor(new InputStream() {
int m_count = 0;
byte[] m_bytes = (range.toRepresentation() + "\n").getBytes();
@@ -74,8 +79,8 @@ public class LogSyncTaskTest {
@Test(groups = { UNIT })
public synchronized void synchronizeLog() throws Exception {
- final LogDescriptor range = new LogDescriptor("gwID", 1, new SortedRangeSet(new long[] {0}));
- final LogEvent event = new LogEvent("gwID", 1, 1, 1, 1, new Properties());
+ final LogDescriptor range = new LogDescriptor(GW_ID, 1, new SortedRangeSet(new long[] {0}));
+ final LogEvent event = new LogEvent(GW_ID, 1, 1, 1, 1, new Properties());
final List<LogEvent> events = new ArrayList<LogEvent>();
events.add(event);