You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ace.apache.org by ma...@apache.org on 2011/09/14 13:21:33 UTC

svn commit: r1170538 - in /incubator/ace/trunk/ace-gateway-log/src: main/java/org/apache/ace/gateway/log/task/LogSyncTask.java test/java/org/apache/ace/gateway/log/task/LogSyncTaskTest.java

Author: marrs
Date: Wed Sep 14 11:21:32 2011
New Revision: 1170538

URL: http://svn.apache.org/viewvc?rev=1170538&view=rev
Log:
ACE-173 more implementation changes

Modified:
    incubator/ace/trunk/ace-gateway-log/src/main/java/org/apache/ace/gateway/log/task/LogSyncTask.java
    incubator/ace/trunk/ace-gateway-log/src/test/java/org/apache/ace/gateway/log/task/LogSyncTaskTest.java

Modified: incubator/ace/trunk/ace-gateway-log/src/main/java/org/apache/ace/gateway/log/task/LogSyncTask.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/ace-gateway-log/src/main/java/org/apache/ace/gateway/log/task/LogSyncTask.java?rev=1170538&r1=1170537&r2=1170538&view=diff
==============================================================================
--- incubator/ace/trunk/ace-gateway-log/src/main/java/org/apache/ace/gateway/log/task/LogSyncTask.java (original)
+++ incubator/ace/trunk/ace-gateway-log/src/main/java/org/apache/ace/gateway/log/task/LogSyncTask.java Wed Sep 14 11:21:32 2011
@@ -67,9 +67,7 @@ public class LogSyncTask implements Runn
         if (host == null) {
             // expected if there's no discovered
             // ps or relay server
-            m_log.log(LogService.LOG_WARNING,
-                    "Unable to synchronize log with remote (endpoint="
-                            + m_endpoint + ") - none available");
+            m_log.log(LogService.LOG_WARNING, "Unable to synchronize log with remote (endpoint=" + m_endpoint + ") - none available");
             return;
         }
 
@@ -80,18 +78,17 @@ public class LogSyncTask implements Runn
             for (int i = 0; i < logIDs.length; i++) {
                 Connection queryConnection = new Connection(new URL(host,
                         m_endpoint + "/" + COMMAND_QUERY + "?"
-                                + PARAMETER_GATEWAYID + "=" + gatewayID + "&"
-                                + PARAMETER_LOGID + "=" + logIDs[i]));
+                            + PARAMETER_GATEWAYID + "=" + gatewayID + "&"
+                            + PARAMETER_LOGID + "=" + logIDs[i]));
                 // TODO: make sure no actual call is made using sendConnection
                 // when there's nothing to sync
-                synchronizeLog(logIDs[i], queryConnection.getInputStream(),
-                        sendConnection);
+                synchronizeLog(logIDs[i], queryConnection.getInputStream(), sendConnection);
             }
-        } catch (IOException e) {
-            m_log.log(LogService.LOG_ERROR,
-                    "Unable to (fully) synchronize log with remote (endpoint="
-                            + m_endpoint + ")", e);
-        } finally {
+        }
+        catch (IOException e) {
+            m_log.log(LogService.LOG_ERROR, "Unable to (fully) synchronize log with remote (endpoint=" + m_endpoint + ")", e);
+        }
+        finally {
             if (sendConnection != null) {
                 sendConnection.close();
             }
@@ -114,8 +111,7 @@ public class LogSyncTask implements Runn
      *             If synchronization could not be completed due to an I/O
      *             failure.
      */
-    protected void synchronizeLog(long logID, InputStream queryInput,
-            Connection sendConnection) throws IOException {
+    protected void synchronizeLog(long logID, InputStream queryInput, Connection sendConnection) throws IOException {
         long highestLocal = m_LogStore.getHighestID(logID);
         if (highestLocal == 0) {
             // No events, no need to synchronize
@@ -126,23 +122,23 @@ public class LogSyncTask implements Runn
         SortedRangeSet delta = remoteRange.diffDest(localRange);
         RangeIterator rangeIterator = delta.iterator();
         BufferedWriter writer = null;
-        writer = new BufferedWriter(new OutputStreamWriter(
-                sendConnection.getOutputStream()));
+        writer = new BufferedWriter(new OutputStreamWriter(sendConnection.getOutputStream()));
         if (rangeIterator.hasNext()) {
             long lowest = rangeIterator.next();
             long highest = delta.getHigh();
             if (lowest <= highest) {
-                List events = m_LogStore.get(logID, lowest,
-                        highestLocal > highest ? highest : highestLocal);
+                List events = m_LogStore.get(logID, lowest, highestLocal > highest ? highest : highestLocal);
                 Iterator iter = events.iterator();
                 while (iter.hasNext()) {
                     LogEvent current = (LogEvent) iter.next();
-                    while ((current.getID() > lowest)
-                            && rangeIterator.hasNext()) {
+                    while ((current.getID() > lowest) && rangeIterator.hasNext()) {
                         lowest = rangeIterator.next();
                     }
                     if (current.getID() == lowest) {
-                        writer.write(current.toRepresentation() + "\n");
+                        // before we send the LogEvent to the other side, we fill out the
+                        // appropriate identification
+                        LogEvent event = new LogEvent(m_identification.getID(), current);
+                        writer.write(event.toRepresentation() + "\n");
                     }
                 }
             }
@@ -160,8 +156,7 @@ public class LogSyncTask implements Runn
      * @throws java.io.IOException
      *             If no range could be determined due to an I/O failure.
      */
-    protected LogDescriptor getDescriptor(InputStream queryInput)
-            throws IOException {
+    protected LogDescriptor getDescriptor(InputStream queryInput) throws IOException {
         BufferedReader queryReader = null;
         try {
             queryReader = new BufferedReader(new InputStreamReader(queryInput));
@@ -171,13 +166,11 @@ public class LogSyncTask implements Runn
                     return new LogDescriptor(rangeString);
                 } 
                 catch (IllegalArgumentException iae) {
-                    throw new IOException(
-                            "Could not determine highest remote event id, received malformed event range ("
-                                    + rangeString + ")");
+                    throw new IOException("Could not determine highest remote event id, received malformed event range (" + rangeString + ")");
                 }
-            } else {
-                throw new IOException(
-                        "Could not construct LogDescriptor from stream because stream is empty");
+            }
+            else {
+                throw new IOException("Could not construct LogDescriptor from stream because stream is empty");
             }
         } 
         finally {

Modified: incubator/ace/trunk/ace-gateway-log/src/test/java/org/apache/ace/gateway/log/task/LogSyncTaskTest.java
URL: http://svn.apache.org/viewvc/incubator/ace/trunk/ace-gateway-log/src/test/java/org/apache/ace/gateway/log/task/LogSyncTaskTest.java?rev=1170538&r1=1170537&r2=1170538&view=diff
==============================================================================
--- incubator/ace/trunk/ace-gateway-log/src/test/java/org/apache/ace/gateway/log/task/LogSyncTaskTest.java (original)
+++ incubator/ace/trunk/ace-gateway-log/src/test/java/org/apache/ace/gateway/log/task/LogSyncTaskTest.java Wed Sep 14 11:21:32 2011
@@ -42,20 +42,25 @@ import org.testng.annotations.Test;
 
 public class LogSyncTaskTest {
 
+    private static final String GW_ID = "gwID";
     private LogSyncTask m_task;
 
     @BeforeMethod(alwaysRun = true)
     protected void setUp() throws Exception {
         m_task = new LogSyncTask("testlog");
         TestUtils.configureObject(m_task, LogService.class);
-        TestUtils.configureObject(m_task, Identification.class);
+        TestUtils.configureObject(m_task, Identification.class, new Identification() {
+            public String getID() {
+                return GW_ID;
+            }
+        });
         TestUtils.configureObject(m_task, Discovery.class);
         TestUtils.configureObject(m_task, LogStore.class);
     }
 
     @Test(groups = { UNIT })
     public synchronized void getRange() throws Exception {
-        final LogDescriptor range = new LogDescriptor("gwID", 1, new SortedRangeSet("1-10"));
+        final LogDescriptor range = new LogDescriptor(GW_ID, 1, new SortedRangeSet("1-10"));
         m_task.getDescriptor(new InputStream() {
             int m_count = 0;
             byte[] m_bytes = (range.toRepresentation() + "\n").getBytes();
@@ -74,8 +79,8 @@ public class LogSyncTaskTest {
 
     @Test(groups = { UNIT })
     public synchronized void synchronizeLog() throws Exception {
-        final LogDescriptor range = new LogDescriptor("gwID", 1, new SortedRangeSet(new long[] {0}));
-        final LogEvent event = new LogEvent("gwID", 1, 1, 1, 1, new Properties());
+        final LogDescriptor range = new LogDescriptor(GW_ID, 1, new SortedRangeSet(new long[] {0}));
+        final LogEvent event = new LogEvent(GW_ID, 1, 1, 1, 1, new Properties());
         final List<LogEvent> events = new ArrayList<LogEvent>();
         events.add(event);