You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/26 04:54:10 UTC
[07/51] [abbrv] incubator-nifi git commit: NIFI-282: Refactoring to
allow for separate client
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index a51cdba..1e33e1f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.remote;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -23,6 +24,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -37,10 +39,9 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.remote.client.socket.EndpointConnectionState;
-import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
+import org.apache.nifi.remote.client.socket.EndpointConnection;
+import org.apache.nifi.remote.client.socket.EndpointConnectionPool;
import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
@@ -50,6 +51,7 @@ import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,9 +68,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
private final AtomicBoolean useCompression = new AtomicBoolean(false);
private final AtomicBoolean targetExists = new AtomicBoolean(true);
private final AtomicBoolean targetRunning = new AtomicBoolean(true);
+ private final SSLContext sslContext;
private final TransferDirection transferDirection;
- private final EndpointConnectionStatePool connectionStatePool;
+ private final AtomicReference<EndpointConnectionPool> connectionPoolRef = new AtomicReference<>();
private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
private final Lock interruptLock = new ReentrantLock();
@@ -83,9 +86,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
this.remoteGroup = remoteGroup;
this.transferDirection = direction;
+ this.sslContext = sslContext;
setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
-
- connectionStatePool = remoteGroup.getConnectionPool();
+ }
+
+ private static File getPeerPersistenceFile(final String portId) {
+ final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory();
+ return new File(stateDir, portId + ".peers");
}
@Override
@@ -111,6 +118,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
} finally {
interruptLock.unlock();
}
+
+ final EndpointConnectionPool pool = connectionPoolRef.get();
+ if ( pool != null ) {
+ pool.shutdown();
+ }
}
@Override
@@ -123,6 +135,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
} finally {
interruptLock.unlock();
}
+
+ final EndpointConnectionPool connectionPool = new EndpointConnectionPool(remoteGroup.getTargetUri().toString(),
+ remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS),
+ sslContext, remoteGroup.getEventReporter(), getPeerPersistenceFile(getIdentifier()));
+ connectionPoolRef.set(connectionPool);
}
@@ -140,9 +157,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
String url = getRemoteProcessGroup().getTargetUri().toString();
- final EndpointConnectionState connectionState;
+ final EndpointConnectionPool connectionPool = connectionPoolRef.get();
+ final EndpointConnection connection;
try {
- connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection);
+ connection = connectionPool.getEndpointConnection(this, transferDirection);
} catch (final PortNotRunningException e) {
context.yield();
this.targetRunning.set(false);
@@ -157,7 +175,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
logger.error(message);
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
return;
- } catch (final HandshakeException | IOException e) {
+ } catch (final IOException e) {
final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString());
logger.error(message);
if ( logger.isDebugEnabled() ) {
@@ -168,15 +186,15 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return;
}
- if ( connectionState == null ) {
+ if ( connection == null ) {
logger.debug("{} Unable to determine the next peer to communicate with; all peers must be penalized, so yielding context", this);
context.yield();
return;
}
- FlowFileCodec codec = connectionState.getCodec();
- SocketClientProtocol protocol = connectionState.getSocketClientProtocol();
- final Peer peer = connectionState.getPeer();
+ FlowFileCodec codec = connection.getCodec();
+ SocketClientProtocol protocol = connection.getSocketClientProtocol();
+ final Peer peer = connection.getPeer();
url = peer.getUrl();
try {
@@ -194,7 +212,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
transferFlowFiles(peer, protocol, context, session, codec);
} else {
- receiveFlowFiles(peer, protocol, context, session, codec);
+ final int numReceived = receiveFlowFiles(peer, protocol, context, session, codec);
+ if ( numReceived == 0 ) {
+ context.yield();
+ }
}
interruptLock.lock();
@@ -210,13 +231,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
session.commit();
- connectionState.setLastTimeUsed();
- connectionStatePool.offer(connectionState);
+ connection.setLastTimeUsed();
+ connectionPool.offer(connection);
} catch (final TransmissionDisabledException e) {
cleanup(protocol, peer);
session.rollback();
} catch (final Exception e) {
- connectionStatePool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS));
+ connectionPool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS));
final String message = String.format("%s failed to communicate with %s (%s) due to %s", this, peer == null ? url : peer, protocol, e.toString());
logger.error(message);
@@ -261,12 +282,12 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
}
- private void transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- protocol.transferFlowFiles(peer, context, session, codec);
+ private int transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ return protocol.transferFlowFiles(peer, context, session, codec);
}
- private void receiveFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- protocol.receiveFlowFiles(peer, context, session, codec);
+ private int receiveFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ return protocol.receiveFlowFiles(peer, context, session, codec);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
deleted file mode 100644
index dca1d84..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-
-import org.apache.nifi.remote.AbstractCommunicationsSession;
-
-public class SSLSocketChannelCommunicationsSession extends AbstractCommunicationsSession {
- private final SSLSocketChannel channel;
- private final SSLSocketChannelInput request;
- private final SSLSocketChannelOutput response;
-
- public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel, final String uri) {
- super(uri);
- request = new SSLSocketChannelInput(channel);
- response = new SSLSocketChannelOutput(channel);
- this.channel = channel;
- }
-
- @Override
- public SSLSocketChannelInput getInput() {
- return request;
- }
-
- @Override
- public SSLSocketChannelOutput getOutput() {
- return response;
- }
-
- @Override
- public void setTimeout(final int millis) throws IOException {
- channel.setTimeout(millis);
- }
-
- @Override
- public int getTimeout() throws IOException {
- return channel.getTimeout();
- }
-
- @Override
- public void close() throws IOException {
- channel.close();
- }
-
- @Override
- public boolean isClosed() {
- return channel.isClosed();
- }
-
- @Override
- public boolean isDataAvailable() {
- try {
- return request.isDataAvailable();
- } catch (final Exception e) {
- return false;
- }
- }
-
- @Override
- public long getBytesWritten() {
- return response.getBytesWritten();
- }
-
- @Override
- public long getBytesRead() {
- return request.getBytesRead();
- }
-
- @Override
- public void interrupt() {
- channel.interrupt();
- }
-
- @Override
- public String toString() {
- return super.toString() + "[SSLSocketChannel=" + channel + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
deleted file mode 100644
index 60ef33f..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.remote.protocol.CommunicationsInput;
-
-public class SSLSocketChannelInput implements CommunicationsInput {
- private final SSLSocketChannelInputStream in;
- private final ByteCountingInputStream countingIn;
- private final InputStream bufferedIn;
-
- public SSLSocketChannelInput(final SSLSocketChannel socketChannel) {
- in = new SSLSocketChannelInputStream(socketChannel);
- countingIn = new ByteCountingInputStream(in);
- this.bufferedIn = new BufferedInputStream(countingIn);
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- return bufferedIn;
- }
-
- public boolean isDataAvailable() throws IOException {
- return bufferedIn.available() > 0;
- }
-
- @Override
- public long getBytesRead() {
- return countingIn.getBytesRead();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
deleted file mode 100644
index dc3d68f..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
-import org.apache.nifi.remote.protocol.CommunicationsOutput;
-
-public class SSLSocketChannelOutput implements CommunicationsOutput {
- private final OutputStream out;
- private final ByteCountingOutputStream countingOut;
-
- public SSLSocketChannelOutput(final SSLSocketChannel channel) {
- countingOut = new ByteCountingOutputStream(new SSLSocketChannelOutputStream(channel));
- out = new BufferedOutputStream(countingOut);
- }
-
- @Override
- public OutputStream getOutputStream() throws IOException {
- return out;
- }
-
- @Override
- public long getBytesWritten() {
- return countingOut.getBytesWritten();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index eb22b0e..63c960d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -430,7 +430,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
logger.debug("{} received {} from {}", new Object[] {this, transactionResponse, peer});
if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
- peer.penalize(port.getYieldPeriod(TimeUnit.MILLISECONDS));
+ peer.penalize(port.getIdentifier(), port.getYieldPeriod(TimeUnit.MILLISECONDS));
} else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
}