You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/20 05:12:54 UTC

[17/50] [abbrv] incubator-nifi git commit: NIFI-74, NIFI-345, NIFI-495: Fixed several site-to-site related bugs

NIFI-74, NIFI-345, NIFI-495: Fixed several site-to-site related bugs


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b682b6fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b682b6fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b682b6fa

Branch: refs/heads/NIFI-271
Commit: b682b6fab543cabeb3a321d6e5cf22f7ce9968c1
Parents: e9cb3b3
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Apr 9 17:59:02 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Apr 9 17:59:02 2015 -0400

----------------------------------------------------------------------
 .../client/socket/EndpointConnectionPool.java   | 43 ++++++++++++++++----
 .../nifi/remote/client/socket/SocketClient.java |  3 +-
 .../protocol/socket/SocketClientProtocol.java   |  8 +++-
 .../socket/SocketClientTransaction.java         | 25 ++++++++----
 .../nifi/remote/StandardRemoteGroupPort.java    |  1 +
 5 files changed, 61 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b682b6fa/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 1a6dfd5..1b5412c 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -88,6 +88,7 @@ import org.apache.nifi.web.api.dto.ControllerDTO;
 import org.apache.nifi.web.api.dto.PortDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.helpers.MessageFormatter;
 
 public class EndpointConnectionPool {
     public static final long PEER_REFRESH_PERIOD = 60000L;
@@ -202,6 +203,28 @@ public class EndpointConnectionPool {
     	}, 5, 5, TimeUnit.SECONDS);
     }
     
+    void warn(final String msg, final Object... args) {
+    	logger.warn(msg, args);
+    	if ( eventReporter != null ) {
+    		eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage());
+    	}
+    }
+    
+    void warn(final String msg, final Throwable t) {
+    	logger.warn(msg, t);
+    	
+    	if ( eventReporter != null ) {
+    		eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", msg + ": " + t.toString());
+    	}
+    }
+    
+    void error(final String msg, final Object... args) {
+    	logger.error(msg, args);
+    	if ( eventReporter != null ) {
+    		eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage());
+    	}
+    }
+    
     private String getPortIdentifier(final TransferDirection transferDirection) throws IOException {
         if ( remoteDestination.getIdentifier() != null ) {
             return remoteDestination.getIdentifier();
@@ -271,6 +294,7 @@ public class EndpointConnectionPool {
                     logger.debug("{} No Connection available for Port {}; creating new Connection", this, portId);
                     protocol = new SocketClientProtocol();
                     protocol.setDestination(new IdEnrichedRemoteDestination(remoteDestination, portId));
+                    protocol.setEventReporter(eventReporter);
 
                     final long penalizationMillis = remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS);
                     try {
@@ -312,7 +336,9 @@ public class EndpointConnectionPool {
                         
                         // handle error cases
                         if ( protocol.isDestinationFull() ) {
-                            logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer);
+                            logger.warn("{} {} indicates that port {}'s destination is full; penalizing peer", 
+                            		this, peer, config.getPortName() == null ? config.getPortIdentifier() : config.getPortName());
+                            
                             penalize(peer, penalizationMillis);
                             try {
                             	peer.close();
@@ -341,7 +367,7 @@ public class EndpointConnectionPool {
                         cleanup(protocol, peer);
                         
                         final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString());
-                        logger.error(message);
+                        error(message);
                         if ( logger.isDebugEnabled() ) {
                             logger.error("", e);
                         }
@@ -463,7 +489,7 @@ public class EndpointConnectionPool {
                         peerList = createPeerStatusList(direction);
                     } catch (final Exception e) {
                         final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
-                        logger.warn(message);
+                        warn(message);
                         if ( logger.isDebugEnabled() ) {
                             logger.warn("", e);
                         }
@@ -503,7 +529,7 @@ public class EndpointConnectionPool {
     }
     
     private boolean isPenalized(final PeerStatus peerStatus) {
-        final Long expirationEnd = peerTimeoutExpirations.get(peerStatus);
+        final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription());
         return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis() );
     }
     
@@ -587,7 +613,7 @@ public class EndpointConnectionPool {
             clientProtocol.shutdown(peer);
         } catch (final IOException e) {
             final String message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString());
-            logger.warn(message);
+            warn(message);
             if (logger.isDebugEnabled()) {
                 logger.warn("", e);
             }
@@ -597,7 +623,7 @@ public class EndpointConnectionPool {
             peer.close();
         } catch (final IOException e) {
             final String message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString());
-            logger.warn(message);
+            warn(message);
             if (logger.isDebugEnabled()) {
                 logger.warn("", e);
             }
@@ -622,7 +648,8 @@ public class EndpointConnectionPool {
             }
 
         } catch (final IOException e) {
-            logger.error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString(), e);
+            error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString());
+            logger.error("", e);
         }
     }
 
@@ -818,7 +845,7 @@ public class EndpointConnectionPool {
             peerStatusCache = new PeerStatusCache(statuses);
             logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
         } catch (Exception e) {
-            logger.warn("{} Unable to refresh Remote Group's peers due to {}", this, e);
+            warn("{} Unable to refresh Remote Group's peers due to {}", this, e);
             if (logger.isDebugEnabled()) {
                 logger.warn("", e);
             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b682b6fa/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index ed54ccb..4aab3f7 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -84,6 +84,7 @@ public class SocketClient implements SiteToSiteClient {
 		    logger.debug("Unable to resolve port [{}] to an identifier", portName);
 		} else {
 		    logger.debug("Resolved port [{}] to identifier [{}]", portName, portId);
+		    this.portIdentifier = portId;
 		}
 		
 		return portId;
@@ -136,7 +137,7 @@ public class SocketClient implements SiteToSiteClient {
 				connectionState.getPeer(), connectionState.getCodec(), direction);
 		} catch (final Throwable t) {
 			pool.terminate(connectionState);
-			throw t;
+			throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t);
 		}
 		
 		// Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b682b6fa/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index c3275ea..83c5305 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
@@ -75,6 +76,7 @@ public class SocketClientProtocol implements ClientProtocol {
     private int batchCount;
     private long batchSize;
     private long batchMillis;
+    private EventReporter eventReporter;
 
     private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
     
@@ -93,6 +95,10 @@ public class SocketClientProtocol implements ClientProtocol {
         this.batchMillis = millis;
     }
     
+    public void setEventReporter(final EventReporter eventReporter) {
+    	this.eventReporter = eventReporter;
+    }
+    
     public void setDestination(final RemoteDestination destination) {
         this.destination = destination;
         this.useCompression = destination.isUseCompression();
@@ -272,7 +278,7 @@ public class SocketClientProtocol implements ClientProtocol {
         }
         
         return new SocketClientTransaction(versionNegotiator.getVersion(), destination.getIdentifier(), peer, codec, 
-        		direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+        		direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS), eventReporter);
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b682b6fa/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index a1ce07e..e69104f 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -27,6 +27,7 @@ import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.CheckedOutputStream;
 
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.Communicant;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.Transaction;
@@ -39,6 +40,7 @@ import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.reporting.Severity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,6 +58,7 @@ public class SocketClientTransaction implements Transaction {
 	private final Peer peer;
 	private final int penaltyMillis;
 	private final String destinationId;
+	private final EventReporter eventReporter;
 	
 	private boolean dataAvailable = false;
 	private int transfers = 0;
@@ -63,7 +66,7 @@ public class SocketClientTransaction implements Transaction {
 	private TransactionState state;
 	
 	SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec, 
-			final TransferDirection direction, final boolean useCompression, final int penaltyMillis) throws IOException {
+			final TransferDirection direction, final boolean useCompression, final int penaltyMillis, final EventReporter eventReporter) throws IOException {
 		this.protocolVersion = protocolVersion;
 		this.destinationId = destinationId;
 		this.peer = peer;
@@ -74,6 +77,7 @@ public class SocketClientTransaction implements Transaction {
 		this.compress = useCompression;
 		this.state = TransactionState.TRANSACTION_STARTED;
 		this.penaltyMillis = penaltyMillis;
+		this.eventReporter = eventReporter;
 		
 		initialize();
 	}
@@ -116,11 +120,11 @@ public class SocketClientTransaction implements Transaction {
 	    try {
 	        try {
         		if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
-        			throw new IllegalStateException("Cannot receive data because Transaction State is " + state);
+        			throw new IllegalStateException("Cannot receive data from " + peer + " because Transaction State is " + state);
         		}
         		
             	if ( direction == TransferDirection.SEND ) {
-            	    throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
+            	    throw new IllegalStateException("Attempting to receive data from " + peer + " but started a SEND Transaction");
             	}
             	
             	// if we already know there's no data, just return null
@@ -142,7 +146,7 @@ public class SocketClientTransaction implements Transaction {
                             this.dataAvailable = false;
                             break;
                         default:
-                            throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+                            throw new ProtocolException("Got unexpected response from " + peer + " when asking for data: " + dataAvailableCode);
                     }
                 }
             	
@@ -184,11 +188,11 @@ public class SocketClientTransaction implements Transaction {
 	    try {
 	        try {
         		if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
-        			throw new IllegalStateException("Cannot send data because Transaction State is " + state);
+        			throw new IllegalStateException("Cannot send data to " + peer + " because Transaction State is " + state);
         		}
         
                 if ( direction == TransferDirection.RECEIVE ) {
-                    throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
+                    throw new IllegalStateException("Attempting to send data to " + peer + " but started a RECEIVE Transaction");
                 }
         
         		if ( transfers > 0 ) {
@@ -242,7 +246,7 @@ public class SocketClientTransaction implements Transaction {
 	    try {
 	        try {
         		if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
-        			throw new IllegalStateException("Cannot complete transaction because state is " + state + 
+        			throw new IllegalStateException("Cannot complete transaction with " + peer + " because state is " + state + 
         					"; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
         		}
         		
@@ -272,7 +276,7 @@ public class SocketClientTransaction implements Transaction {
                         peer.penalize(destinationId, penaltyMillis);
                         backoff = true;
                     } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
-                        throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+                        throw new ProtocolException("After sending data to " + peer + ", expected TRANSACTION_FINISHED response but got " + transactionResponse);
                     }
                     
                     state = TransactionState.TRANSACTION_COMPLETED;
@@ -324,7 +328,10 @@ public class SocketClientTransaction implements Transaction {
                     try {
                         confirmTransactionResponse = Response.read(dis);
                     } catch (final IOException ioe) {
-                        logger.error("Failed to receive response code from {} when expected confirmation of transaction", peer);
+                        logger.error("Failed to receive response code from {} when expecting confirmation of transaction", peer);
+                        if ( eventReporter != null ) {
+                        	eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", "Failed to receive response code from " + peer + " when expecting confirmation of transaction");
+                        }
                         throw ioe;
                     }
                     

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b682b6fa/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 69ba0fd..eec6ed5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -183,6 +183,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
             return;
         } catch (final IOException e) {
+        	context.yield();
             final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString());
             logger.error(message);
             if ( logger.isDebugEnabled() ) {