You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/26 04:54:10 UTC

[07/51] [abbrv] incubator-nifi git commit: NIFI-282: Refactoring to allow for separate client

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index a51cdba..1e33e1f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.remote;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -23,6 +24,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -37,10 +39,9 @@ import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.remote.client.socket.EndpointConnectionState;
-import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
+import org.apache.nifi.remote.client.socket.EndpointConnection;
+import org.apache.nifi.remote.client.socket.EndpointConnectionPool;
 import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.TransmissionDisabledException;
@@ -50,6 +51,7 @@ import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,9 +68,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     private final AtomicBoolean useCompression = new AtomicBoolean(false);
     private final AtomicBoolean targetExists = new AtomicBoolean(true);
     private final AtomicBoolean targetRunning = new AtomicBoolean(true);
+    private final SSLContext sslContext;
     private final TransferDirection transferDirection;
     
-    private final EndpointConnectionStatePool connectionStatePool;
+    private final AtomicReference<EndpointConnectionPool> connectionPoolRef = new AtomicReference<>();
     
     private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
     private final Lock interruptLock = new ReentrantLock();
@@ -83,9 +86,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         
         this.remoteGroup = remoteGroup;
         this.transferDirection = direction;
+        this.sslContext = sslContext;
         setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
-        
-        connectionStatePool = remoteGroup.getConnectionPool();
+    }
+    
+    private static File getPeerPersistenceFile(final String portId) {
+        final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory();
+        return new File(stateDir, portId + ".peers");
     }
     
     @Override
@@ -111,6 +118,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         } finally {
             interruptLock.unlock();
         }
+        
+        final EndpointConnectionPool pool = connectionPoolRef.get();
+        if ( pool != null ) {
+            pool.shutdown();
+        }
     }
     
     @Override
@@ -123,6 +135,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         } finally {
             interruptLock.unlock();
         }
+        
+        final EndpointConnectionPool connectionPool = new EndpointConnectionPool(remoteGroup.getTargetUri().toString(), 
+                remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), 
+                sslContext, remoteGroup.getEventReporter(), getPeerPersistenceFile(getIdentifier()));
+        connectionPoolRef.set(connectionPool);
     }
     
     
@@ -140,9 +157,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         
         String url = getRemoteProcessGroup().getTargetUri().toString();
         
-        final EndpointConnectionState connectionState;
+        final EndpointConnectionPool connectionPool = connectionPoolRef.get();
+        final EndpointConnection connection;
         try {
-        	connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection);
+        	connection = connectionPool.getEndpointConnection(this, transferDirection);
         } catch (final PortNotRunningException e) {
             context.yield();
             this.targetRunning.set(false);
@@ -157,7 +175,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             logger.error(message);
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
             return;
-        } catch (final HandshakeException | IOException e) {
+        } catch (final IOException e) {
             final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString());
             logger.error(message);
             if ( logger.isDebugEnabled() ) {
@@ -168,15 +186,15 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             return;
         }
         
-        if ( connectionState == null ) {
+        if ( connection == null ) {
             logger.debug("{} Unable to determine the next peer to communicate with; all peers must be penalized, so yielding context", this);
             context.yield();
             return;
         }
         
-        FlowFileCodec codec = connectionState.getCodec();
-        SocketClientProtocol protocol = connectionState.getSocketClientProtocol();
-        final Peer peer = connectionState.getPeer();
+        FlowFileCodec codec = connection.getCodec();
+        SocketClientProtocol protocol = connection.getSocketClientProtocol();
+        final Peer peer = connection.getPeer();
         url = peer.getUrl();
         
         try {
@@ -194,7 +212,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
                 transferFlowFiles(peer, protocol, context, session, codec);
             } else {
-                receiveFlowFiles(peer, protocol, context, session, codec);
+                final int numReceived = receiveFlowFiles(peer, protocol, context, session, codec);
+                if ( numReceived == 0 ) {
+                    context.yield();
+                }
             }
 
             interruptLock.lock();
@@ -210,13 +231,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
 
             session.commit();
             
-            connectionState.setLastTimeUsed();
-            connectionStatePool.offer(connectionState);
+            connection.setLastTimeUsed();
+            connectionPool.offer(connection);
         } catch (final TransmissionDisabledException e) {
             cleanup(protocol, peer);
             session.rollback();
         } catch (final Exception e) {
-            connectionStatePool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS));
+            connectionPool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS));
 
             final String message = String.format("%s failed to communicate with %s (%s) due to %s", this, peer == null ? url : peer, protocol, e.toString());
             logger.error(message);
@@ -261,12 +282,12 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     }
     
     
-    private void transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
-        protocol.transferFlowFiles(peer, context, session, codec);
+    private int transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+        return protocol.transferFlowFiles(peer, context, session, codec);
     }
     
-    private void receiveFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
-        protocol.receiveFlowFiles(peer, context, session, codec);
+    private int receiveFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+        return protocol.receiveFlowFiles(peer, context, session, codec);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
deleted file mode 100644
index dca1d84..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-
-import org.apache.nifi.remote.AbstractCommunicationsSession;
-
-public class SSLSocketChannelCommunicationsSession extends AbstractCommunicationsSession {
-    private final SSLSocketChannel channel;
-    private final SSLSocketChannelInput request;
-    private final SSLSocketChannelOutput response;
-    
-    public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel, final String uri) {
-        super(uri);
-        request = new SSLSocketChannelInput(channel);
-        response = new SSLSocketChannelOutput(channel);
-        this.channel = channel;
-    }
-    
-    @Override
-    public SSLSocketChannelInput getInput() {
-        return request;
-    }
-
-    @Override
-    public SSLSocketChannelOutput getOutput() {
-        return response;
-    }
-
-    @Override
-    public void setTimeout(final int millis) throws IOException {
-        channel.setTimeout(millis);
-    }
-
-    @Override
-    public int getTimeout() throws IOException {
-        return channel.getTimeout();
-    }
-
-    @Override
-    public void close() throws IOException {
-        channel.close();
-    }
-    
-    @Override
-    public boolean isClosed() {
-        return channel.isClosed();
-    }
-    
-    @Override
-    public boolean isDataAvailable() {
-        try {
-            return request.isDataAvailable();
-        } catch (final Exception e) {
-            return false;
-        }
-    }
-
-    @Override
-    public long getBytesWritten() {
-        return response.getBytesWritten();
-    }
-
-    @Override
-    public long getBytesRead() {
-        return request.getBytesRead();
-    }
-
-    @Override
-    public void interrupt() {
-        channel.interrupt();
-    }
-    
-    @Override
-    public String toString() {
-        return super.toString() + "[SSLSocketChannel=" + channel + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
deleted file mode 100644
index 60ef33f..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.remote.protocol.CommunicationsInput;
-
-public class SSLSocketChannelInput implements CommunicationsInput {
-    private final SSLSocketChannelInputStream in;
-    private final ByteCountingInputStream countingIn;
-    private final InputStream bufferedIn;
-    
-    public SSLSocketChannelInput(final SSLSocketChannel socketChannel) {
-        in = new SSLSocketChannelInputStream(socketChannel);
-        countingIn = new ByteCountingInputStream(in);
-        this.bufferedIn = new BufferedInputStream(countingIn);
-    }
-    
-    @Override
-    public InputStream getInputStream() throws IOException {
-        return bufferedIn;
-    }
-    
-    public boolean isDataAvailable() throws IOException {
-        return bufferedIn.available() > 0;
-    }
-    
-    @Override
-    public long getBytesRead() {
-        return countingIn.getBytesRead();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
deleted file mode 100644
index dc3d68f..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
-import org.apache.nifi.remote.protocol.CommunicationsOutput;
-
-public class SSLSocketChannelOutput implements CommunicationsOutput {
-    private final OutputStream out;
-    private final ByteCountingOutputStream countingOut;
-    
-    public SSLSocketChannelOutput(final SSLSocketChannel channel) {
-        countingOut = new ByteCountingOutputStream(new SSLSocketChannelOutputStream(channel));
-        out = new BufferedOutputStream(countingOut);
-    }
-
-    @Override
-    public OutputStream getOutputStream() throws IOException {
-        return out;
-    }
-    
-    @Override
-    public long getBytesWritten() {
-        return countingOut.getBytesWritten();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index eb22b0e..63c960d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -430,7 +430,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
         
         logger.debug("{} received {} from {}", new Object[] {this, transactionResponse, peer});
         if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
-            peer.penalize(port.getYieldPeriod(TimeUnit.MILLISECONDS));
+            peer.penalize(port.getIdentifier(), port.getYieldPeriod(TimeUnit.MILLISECONDS));
         } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
             throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
         }