You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/21 08:18:48 UTC
[02/12] incubator-nifi git commit: NIFI-271 checkpoint push because
there are so many changes. Long way to go but got through dto library
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 021531f..67f28d2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -68,10 +68,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardRootGroupPort extends AbstractPort implements RootGroupPort {
+
private static final String CATEGORY = "Site to Site";
-
+
private static final Logger logger = LoggerFactory.getLogger(StandardRootGroupPort.class);
-
+
private final AtomicReference<Set<String>> groupAccessControl = new AtomicReference<Set<String>>(new HashSet<String>());
private final AtomicReference<Set<String>> userAccessControl = new AtomicReference<Set<String>>(new HashSet<String>());
private final ProcessScheduler processScheduler;
@@ -82,18 +83,18 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
private final EventReporter eventReporter;
private final ProcessScheduler scheduler;
private final Set<Relationship> relationships;
-
+
private final BlockingQueue<FlowFileRequest> requestQueue = new ArrayBlockingQueue<>(1000);
-
+
private final Set<FlowFileRequest> activeRequests = new HashSet<>();
private final Lock requestLock = new ReentrantLock();
private boolean shutdown = false; // guarded by requestLock
- public StandardRootGroupPort(final String id, final String name, final ProcessGroup processGroup,
+ public StandardRootGroupPort(final String id, final String name, final ProcessGroup processGroup,
final TransferDirection direction, final ConnectableType type, final UserService userService,
final BulletinRepository bulletinRepository, final ProcessScheduler scheduler, final boolean secure) {
super(id, name, processGroup, type, scheduler);
-
+
this.processScheduler = scheduler;
setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
this.userService = userService;
@@ -110,20 +111,20 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, sourceName, category, severity.name(), message));
}
};
-
+
relationships = direction == TransferDirection.RECEIVE ? Collections.singleton(AbstractPort.PORT_RELATIONSHIP) : Collections.<Relationship>emptySet();
}
-
+
@Override
public Collection<Relationship> getRelationships() {
- return relationships;
+ return relationships;
}
-
+
@Override
public boolean isTriggerWhenEmpty() {
return true;
}
-
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
final FlowFileRequest flowFileRequest;
@@ -132,29 +133,29 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
} catch (final InterruptedException ie) {
return;
}
-
- if ( flowFileRequest == null ) {
+
+ if (flowFileRequest == null) {
return;
}
flowFileRequest.setServiceBegin();
-
+
requestLock.lock();
try {
- if ( shutdown ) {
+ if (shutdown) {
final CommunicationsSession commsSession = flowFileRequest.getPeer().getCommunicationsSession();
- if ( commsSession != null ) {
+ if (commsSession != null) {
commsSession.interrupt();
}
}
-
+
activeRequests.add(flowFileRequest);
} finally {
requestLock.unlock();
}
-
+
final ProcessSession session = sessionFactory.createSession();
-
+
try {
onTrigger(context, session, flowFileRequest);
// we leave the session open, because we send it back to the caller of #receiveFlowFile or #transmitFlowFile,
@@ -162,11 +163,11 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
} catch (final TransmissionDisabledException e) {
session.rollback();
} catch (final Exception e) {
- logger.error("{} Failed to process data due to {}", new Object[] {this, e});
- if ( logger.isDebugEnabled() ) {
+ logger.error("{} Failed to process data due to {}", new Object[]{this, e});
+ if (logger.isDebugEnabled()) {
logger.error("", e);
}
-
+
session.rollback();
} finally {
requestLock.lock();
@@ -177,50 +178,50 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
}
}
}
-
+
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
// nothing to do here -- we will never get called because we override onTrigger(ProcessContext, ProcessSessionFactory)
}
-
+
private void onTrigger(final ProcessContext context, final ProcessSession session, final FlowFileRequest flowFileRequest) {
final ServerProtocol protocol = flowFileRequest.getProtocol();
final BlockingQueue<ProcessingResult> responseQueue = flowFileRequest.getResponseQueue();
- if ( flowFileRequest.isExpired() ) {
+ if (flowFileRequest.isExpired()) {
final String message = String.format("%s Cannot service request from %s because the request has timed out", this, flowFileRequest.getPeer());
logger.warn(message);
eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
-
+
responseQueue.add(new ProcessingResult(new RequestExpiredException()));
return;
}
-
+
final Peer peer = flowFileRequest.getPeer();
final CommunicationsSession commsSession = peer.getCommunicationsSession();
final String sourceDn = commsSession.getUserDn();
logger.debug("{} Servicing request for {} (DN={})", this, peer, sourceDn);
-
+
final PortAuthorizationResult authorizationResult = checkUserAuthorization(sourceDn);
- if ( !authorizationResult.isAuthorized() ) {
- final String message = String.format("%s Cannot service request from %s (DN=%s) because peer is not authorized to communicate with this port: %s",
- this, flowFileRequest.getPeer(), flowFileRequest.getPeer().getCommunicationsSession().getUserDn(), authorizationResult.getExplanation());
+ if (!authorizationResult.isAuthorized()) {
+ final String message = String.format("%s Cannot service request from %s (DN=%s) because peer is not authorized to communicate with this port: %s",
+ this, flowFileRequest.getPeer(), flowFileRequest.getPeer().getCommunicationsSession().getUserDn(), authorizationResult.getExplanation());
logger.error(message);
eventReporter.reportEvent(Severity.ERROR, CATEGORY, message);
-
+
responseQueue.add(new ProcessingResult(new NotAuthorizedException(authorizationResult.getExplanation())));
return;
}
final FlowFileCodec codec = protocol.getPreNegotiatedCodec();
- if ( codec == null ) {
+ if (codec == null) {
responseQueue.add(new ProcessingResult(new BadRequestException("None of the supported FlowFile Codecs supplied is compatible with this instance")));
return;
}
final int transferCount;
-
+
try {
- if ( getConnectableType() == ConnectableType.INPUT_PORT ) {
+ if (getConnectableType() == ConnectableType.INPUT_PORT) {
transferCount = receiveFlowFiles(context, session, codec, flowFileRequest);
} else {
transferCount = transferFlowFiles(context, session, codec, flowFileRequest);
@@ -233,58 +234,55 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
} catch (final Exception e) {
session.rollback();
responseQueue.add(new ProcessingResult(e));
-
+
return;
}
-
+
session.commit();
responseQueue.add(new ProcessingResult(transferCount));
}
-
private int transferFlowFiles(final ProcessContext context, final ProcessSession session, final FlowFileCodec codec, final FlowFileRequest request) throws IOException, ProtocolException {
return request.getProtocol().transferFlowFiles(request.getPeer(), context, session, codec);
}
-
private int receiveFlowFiles(final ProcessContext context, final ProcessSession session, final FlowFileCodec codec, final FlowFileRequest receiveRequest) throws IOException, ProtocolException {
return receiveRequest.getProtocol().receiveFlowFiles(receiveRequest.getPeer(), context, session, codec);
}
-
+
@Override
public boolean isValid() {
return (getConnectableType() == ConnectableType.INPUT_PORT) ? !getConnections(Relationship.ANONYMOUS).isEmpty() : true;
}
-
+
@Override
public Collection<ValidationResult> getValidationErrors() {
final Collection<ValidationResult> validationErrors = new ArrayList<>();
if (!isValid()) {
final ValidationResult error = new ValidationResult.Builder()
- .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
- .subject(String.format("Port '%s'", getName()))
- .valid(false)
- .build();
+ .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
+ .subject(String.format("Port '%s'", getName()))
+ .valid(false)
+ .build();
validationErrors.add(error);
}
return validationErrors;
}
-
-
+
@Override
public boolean isTransmitting() {
- if ( !isRunning() ) {
+ if (!isRunning()) {
return false;
}
-
- if ( processScheduler.getActiveThreadCount(this) > 0 ) {
+
+ if (processScheduler.getActiveThreadCount(this) > 0) {
return true;
}
-
- if ( requestQueue.isEmpty() ) {
+
+ if (requestQueue.isEmpty()) {
return false;
}
-
+
requestLock.lock();
try {
return !activeRequests.isEmpty();
@@ -316,14 +314,14 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
@Override
public void shutdown() {
super.shutdown();
-
+
requestLock.lock();
try {
this.shutdown = true;
-
- for ( final FlowFileRequest request : activeRequests ) {
+
+ for (final FlowFileRequest request : activeRequests) {
final CommunicationsSession commsSession = request.getPeer().getCommunicationsSession();
- if ( commsSession != null ) {
+ if (commsSession != null) {
commsSession.interrupt();
}
}
@@ -331,11 +329,11 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
requestLock.unlock();
}
}
-
+
@Override
public void onSchedulingStart() {
super.onSchedulingStart();
-
+
requestLock.lock();
try {
shutdown = false;
@@ -343,14 +341,14 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
requestLock.unlock();
}
}
-
+
@Override
public PortAuthorizationResult checkUserAuthorization(final String dn) {
- if ( !secure ) {
+ if (!secure) {
return new StandardPortAuthorizationResult(true, "Site-to-Site is not Secure");
}
- if ( dn == null ) {
+ if (dn == null) {
final String message = String.format("%s authorization failed for user %s because the DN is unknown", this, dn);
logger.warn(message);
eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
@@ -359,9 +357,9 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
try {
final NiFiUser user = userService.checkAuthorization(dn);
-
+
final Set<Authority> authorities = user.getAuthorities();
- if ( !authorities.contains(Authority.ROLE_NIFI) ) {
+ if (!authorities.contains(Authority.ROLE_NIFI)) {
final String message = String.format("%s authorization failed for user %s because the user does not have Role NiFi", this, dn);
logger.warn(message);
eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
@@ -369,12 +367,12 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
}
final Set<String> allowedUsers = userAccessControl.get();
- if ( allowedUsers.contains(dn) ) {
+ if (allowedUsers.contains(dn)) {
return new StandardPortAuthorizationResult(true, "User is Authorized");
}
final String userGroup = user.getUserGroup();
- if ( userGroup == null ) {
+ if (userGroup == null) {
final String message = String.format("%s authorization failed for user %s because the user does not have a group and is not in the set of Allowed Users for this Port", this, dn);
logger.warn(message);
eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
@@ -383,13 +381,14 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
final Set<String> allowedGroups = groupAccessControl.get();
final boolean allowed = allowedGroups.contains(userGroup);
- if ( !allowed ) {
- final String message = String.format("%s authorization failed for user %s because the user is not in the set of Allowed Users, and the user's group is not in the set of Allowed Groups for this Port", this, dn);
+ if (!allowed) {
+ final String message = String.format("%s authorization failed for user %s because the user "
+ + "is not in the set of Allowed Users, and the user's group is not in the set of Allowed Groups for this Port", this, dn);
logger.warn(message);
eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
return new StandardPortAuthorizationResult(false, "User is not Authorized to communicate with " + this.toString());
}
-
+
return new StandardPortAuthorizationResult(true, "User is part of group '" + userGroup + "', which is Authorized to communicate with " + this.toString());
} catch (final AccountNotFoundException anfe) {
final String message = String.format("%s authorization failed for user %s because the DN is unknown", this, dn);
@@ -418,145 +417,145 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
return new StandardPortAuthorizationResult(false, "Authorization failed because " + e);
}
}
-
+
public static class StandardPortAuthorizationResult implements PortAuthorizationResult {
+
private final boolean isAuthorized;
private final String explanation;
-
+
public StandardPortAuthorizationResult(final boolean isAuthorized, final String explanation) {
this.isAuthorized = isAuthorized;
this.explanation = explanation;
}
-
+
@Override
public boolean isAuthorized() {
return isAuthorized;
}
-
+
@Override
public String getExplanation() {
return explanation;
}
}
-
-
+
private static class ProcessingResult {
+
private final int fileCount;
private final Exception problem;
-
+
public ProcessingResult(final int fileCount) {
this.fileCount = fileCount;
this.problem = null;
}
-
+
public ProcessingResult(final Exception problem) {
this.fileCount = 0;
this.problem = problem;
}
-
+
public Exception getProblem() {
return problem;
}
-
+
public int getFileCount() {
return fileCount;
}
}
-
-
private static class FlowFileRequest {
+
private final Peer peer;
private final ServerProtocol protocol;
private final BlockingQueue<ProcessingResult> queue;
private final long creationTime;
private final AtomicBoolean beingServiced = new AtomicBoolean(false);
-
+
public FlowFileRequest(final Peer peer, final ServerProtocol protocol) {
this.creationTime = System.currentTimeMillis();
this.peer = peer;
this.protocol = protocol;
this.queue = new ArrayBlockingQueue<>(1);
}
-
-
+
public void setServiceBegin() {
this.beingServiced.set(true);
}
-
+
public boolean isBeingServiced() {
return beingServiced.get();
}
-
+
public BlockingQueue<ProcessingResult> getResponseQueue() {
return queue;
}
-
+
public Peer getPeer() {
return peer;
}
-
+
public ServerProtocol getProtocol() {
return protocol;
}
-
+
public boolean isExpired() {
// use double the protocol's expiration because the sender may send data for a bit before
// the timeout starts being counted, and we don't want to timeout before the sender does.
// is this a good idea...???
long expiration = protocol.getRequestExpiration() * 2;
- if ( expiration <= 0L ) {
+ if (expiration <= 0L) {
return false;
}
-
+
if (expiration < 500L) {
expiration = 500L;
}
-
+
return System.currentTimeMillis() > creationTime + expiration;
}
}
-
@Override
- public int receiveFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException {
- if ( getConnectableType() != ConnectableType.INPUT_PORT ) {
+ public int receiveFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders)
+ throws NotAuthorizedException, BadRequestException, RequestExpiredException {
+ if (getConnectableType() != ConnectableType.INPUT_PORT) {
throw new IllegalStateException("Cannot receive FlowFiles because this port is not an Input Port");
}
- if ( !this.isRunning() ) {
+ if (!this.isRunning()) {
throw new IllegalStateException("Port not running");
}
-
+
try {
final FlowFileRequest request = new FlowFileRequest(peer, serverProtocol);
- if ( !this.requestQueue.offer(request) ) {
+ if (!this.requestQueue.offer(request)) {
throw new RequestExpiredException();
}
-
+
// Trigger this port to run.
scheduler.registerEvent(this);
-
+
// Get a response from the response queue but don't wait forever if the port is stopped
ProcessingResult result = null;
-
+
// wait for the request to start getting serviced... and time out if it doesn't happen
// before the request expires
- while ( !request.isBeingServiced() ) {
- if ( request.isExpired() ) {
+ while (!request.isBeingServiced()) {
+ if (request.isExpired()) {
throw new SocketTimeoutException("Read timed out");
} else {
try {
Thread.sleep(100L);
- } catch (final InterruptedException e) {}
+ } catch (final InterruptedException e) {
+ }
}
}
// we've started to service the request. Now just wait until it's finished
result = request.getResponseQueue().take();
-
+
final Exception problem = result.getProblem();
- if ( problem == null ) {
+ if (problem == null) {
return result.getFileCount();
} else {
throw problem;
@@ -571,44 +570,46 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
}
@Override
- public int transferFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException {
- if ( getConnectableType() != ConnectableType.OUTPUT_PORT ) {
+ public int transferFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders)
+ throws NotAuthorizedException, BadRequestException, RequestExpiredException {
+ if (getConnectableType() != ConnectableType.OUTPUT_PORT) {
throw new IllegalStateException("Cannot send FlowFiles because this port is not an Output Port");
}
-
- if ( !this.isRunning() ) {
+
+ if (!this.isRunning()) {
throw new IllegalStateException("Port not running");
}
try {
final FlowFileRequest request = new FlowFileRequest(peer, serverProtocol);
- if ( !this.requestQueue.offer(request) ) {
+ if (!this.requestQueue.offer(request)) {
throw new RequestExpiredException();
}
// Trigger this port to run
scheduler.registerEvent(this);
-
+
// Get a response from the response queue but don't wait forever if the port is stopped
ProcessingResult result = null;
-
+
// wait for the request to start getting serviced... and time out if it doesn't happen
// before the request expires
- while ( !request.isBeingServiced() ) {
- if ( request.isExpired() ) {
+ while (!request.isBeingServiced()) {
+ if (request.isExpired()) {
throw new SocketTimeoutException("Read timed out");
- } else {
+ } else {
try {
Thread.sleep(100L);
- } catch (final InterruptedException e) {}
+ } catch (final InterruptedException e) {
+ }
}
}
// we've started to service the request. Now just wait until it's finished
result = request.getResponseQueue().take();
-
+
final Exception problem = result.getProblem();
- if ( problem == null ) {
+ if (problem == null) {
return result.getFileCount();
} else {
throw problem;
@@ -621,12 +622,12 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
throw new ProcessException(e);
}
}
-
+
@Override
public SchedulingStrategy getSchedulingStrategy() {
return SchedulingStrategy.TIMER_DRIVEN;
}
-
+
@Override
public boolean isSideEffectFree() {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
index 926809c..4a4f96b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
@@ -19,9 +19,10 @@ package org.apache.nifi.remote.exception;
import org.apache.nifi.remote.codec.FlowFileCodec;
public class UnsupportedCodecException extends RuntimeException {
- private static final long serialVersionUID = 198234789237L;
- public UnsupportedCodecException(final String codecName) {
+ private static final long serialVersionUID = 198234789237L;
+
+ public UnsupportedCodecException(final String codecName) {
super("Codec " + codecName + " is not supported");
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
index 391d52b..7d0ffab 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
@@ -40,12 +40,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClusterManagerServerProtocol implements ServerProtocol {
+
public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
private final Logger logger = LoggerFactory.getLogger(ClusterManagerServerProtocol.class);
private NodeInformant nodeInformant;
-
+
private String commsIdentifier;
private boolean shutdown = false;
private boolean handshakeCompleted = false;
@@ -53,52 +54,51 @@ public class ClusterManagerServerProtocol implements ServerProtocol {
public ClusterManagerServerProtocol() {
}
-
-
+
@Override
public void setNodeInformant(final NodeInformant nodeInformant) {
this.nodeInformant = nodeInformant;
}
-
+
@Override
public void handshake(final Peer peer) throws IOException, HandshakeException {
- if ( handshakeCompleted ) {
+ if (handshakeCompleted) {
throw new IllegalStateException("Handshake has already been completed");
}
- if ( shutdown ) {
+ if (shutdown) {
throw new IllegalStateException("Protocol is shutdown");
}
final CommunicationsSession commsSession = peer.getCommunicationsSession();
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-
+
// read communications identifier
commsIdentifier = dis.readUTF();
-
+
// read all of the properties. we don't really care what the properties are.
final int numProperties = dis.readInt();
- for (int i=0; i < numProperties; i++) {
+ for (int i = 0; i < numProperties; i++) {
final String propertyName = dis.readUTF();
final String propertyValue = dis.readUTF();
-
+
final HandshakeProperty property;
try {
property = HandshakeProperty.valueOf(propertyName);
- if ( HandshakeProperty.REQUEST_EXPIRATION_MILLIS.equals(property) ) {
+ if (HandshakeProperty.REQUEST_EXPIRATION_MILLIS.equals(property)) {
requestExpirationMillis = Long.parseLong(propertyValue);
}
} catch (final Exception e) {
}
}
-
+
// send "OK" response
ResponseCode.PROPERTIES_OK.writeResponse(dos);
-
+
logger.debug("Successfully completed handshake with {}; CommsID={}", peer, commsIdentifier);
handshakeCompleted = true;
}
-
+
@Override
public boolean isHandshakeSuccessful() {
return handshakeCompleted;
@@ -106,10 +106,10 @@ public class ClusterManagerServerProtocol implements ServerProtocol {
@Override
public void sendPeerList(final Peer peer) throws IOException {
- if ( !handshakeCompleted ) {
+ if (!handshakeCompleted) {
throw new IllegalStateException("Handshake has not been completed");
}
- if ( shutdown ) {
+ if (shutdown) {
throw new IllegalStateException("Protocol is shutdown");
}
@@ -118,29 +118,29 @@ public class ClusterManagerServerProtocol implements ServerProtocol {
final ClusterNodeInformation clusterNodeInfo = nodeInformant.getNodeInformation();
final Collection<NodeInformation> nodeInfos = clusterNodeInfo.getNodeInformation();
-
+
// determine how many nodes have Site-to-site enabled
int numPeers = 0;
- for ( final NodeInformation nodeInfo : nodeInfos ) {
+ for (final NodeInformation nodeInfo : nodeInfos) {
if (nodeInfo.getSiteToSitePort() != null) {
numPeers++;
}
}
-
+
dos.writeInt(numPeers);
- for ( final NodeInformation nodeInfo : nodeInfos ) {
- if ( nodeInfo.getSiteToSitePort() == null ) {
+ for (final NodeInformation nodeInfo : nodeInfos) {
+ if (nodeInfo.getSiteToSitePort() == null) {
continue;
}
-
+
dos.writeUTF(nodeInfo.getHostname());
dos.writeInt(nodeInfo.getSiteToSitePort());
dos.writeBoolean(nodeInfo.isSiteToSiteSecure());
dos.writeInt(nodeInfo.getTotalFlowFiles());
}
-
+
logger.info("Redirected {} to {} nodes", peer, numPeers);
-
+
dos.flush();
}
@@ -153,7 +153,7 @@ public class ClusterManagerServerProtocol implements ServerProtocol {
public boolean isShutdown() {
return shutdown;
}
-
+
@Override
public FlowFileCodec negotiateCodec(Peer peer) {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 21de646..b931e26 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -65,43 +65,42 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SocketFlowFileServerProtocol implements ServerProtocol {
+
public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
-
+
private ProcessGroup rootGroup;
private String commsIdentifier;
private boolean handshakeCompleted;
-
+
private Boolean useGzip;
private long requestExpirationMillis;
private RootGroupPort port;
private boolean shutdown = false;
private FlowFileCodec negotiatedFlowFileCodec = null;
private String transitUriPrefix = null;
-
+
private int requestedBatchCount = 0;
private long requestedBatchBytes = 0L;
private long requestedBatchNanos = 0L;
private static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L);
-
+
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
private final Logger logger = LoggerFactory.getLogger(SocketFlowFileServerProtocol.class);
-
-
@Override
public void setRootProcessGroup(final ProcessGroup group) {
- if ( !group.isRootGroup() ) {
+ if (!group.isRootGroup()) {
throw new IllegalArgumentException();
}
this.rootGroup = group;
}
-
+
@Override
public void handshake(final Peer peer) throws IOException, HandshakeException {
- if ( handshakeCompleted ) {
+ if (handshakeCompleted) {
throw new IllegalStateException("Handshake has already been completed");
}
- if ( shutdown ) {
+ if (shutdown) {
throw new IllegalStateException("Protocol is shutdown");
}
@@ -109,30 +108,30 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
final CommunicationsSession commsSession = peer.getCommunicationsSession();
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-
+
commsIdentifier = dis.readUTF();
-
- if ( versionNegotiator.getVersion() >= 3 ) {
+
+ if (versionNegotiator.getVersion() >= 3) {
transitUriPrefix = dis.readUTF();
- if ( !transitUriPrefix.endsWith("/") ) {
+ if (!transitUriPrefix.endsWith("/")) {
transitUriPrefix = transitUriPrefix + "/";
}
}
-
+
final Map<String, String> properties = new HashMap<>();
final int numProperties = dis.readInt();
- for (int i=0; i < numProperties; i++) {
+ for (int i = 0; i < numProperties; i++) {
final String propertyName = dis.readUTF();
final String propertyValue = dis.readUTF();
properties.put(propertyName, propertyValue);
}
-
+
// evaluate the properties received
boolean responseWritten = false;
- for ( final Map.Entry<String, String> entry : properties.entrySet() ) {
+ for (final Map.Entry<String, String> entry : properties.entrySet()) {
final String propertyName = entry.getKey();
final String value = entry.getValue();
-
+
final HandshakeProperty property;
try {
property = HandshakeProperty.valueOf(propertyName);
@@ -140,7 +139,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
ResponseCode.UNKNOWN_PROPERTY_NAME.writeResponse(dos, "Unknown Property Name: " + propertyName);
throw new HandshakeException("Received unknown property: " + propertyName);
}
-
+
try {
switch (property) {
case GZIP: {
@@ -152,66 +151,66 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
break;
case BATCH_COUNT:
requestedBatchCount = Integer.parseInt(value);
- if ( requestedBatchCount < 0 ) {
+ if (requestedBatchCount < 0) {
throw new HandshakeException("Cannot request Batch Count less than 1; requested value: " + value);
}
break;
case BATCH_SIZE:
requestedBatchBytes = Long.parseLong(value);
- if ( requestedBatchBytes < 0 ) {
+ if (requestedBatchBytes < 0) {
throw new HandshakeException("Cannot request Batch Size less than 1; requested value: " + value);
}
break;
case BATCH_DURATION:
requestedBatchNanos = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value));
- if ( requestedBatchNanos < 0 ) {
+ if (requestedBatchNanos < 0) {
throw new HandshakeException("Cannot request Batch Duration less than 1; requested value: " + value);
}
break;
case PORT_IDENTIFIER: {
Port receivedPort = rootGroup.getInputPort(value);
- if ( receivedPort == null ) {
+ if (receivedPort == null) {
receivedPort = rootGroup.getOutputPort(value);
}
- if ( receivedPort == null ) {
+ if (receivedPort == null) {
logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
ResponseCode.UNKNOWN_PORT.writeResponse(dos);
throw new HandshakeException("Received unknown port identifier: " + value);
}
- if ( !(receivedPort instanceof RootGroupPort) ) {
+ if (!(receivedPort instanceof RootGroupPort)) {
logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
ResponseCode.UNKNOWN_PORT.writeResponse(dos);
throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort");
}
-
+
this.port = (RootGroupPort) receivedPort;
final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
- if ( !portAuthResult.isAuthorized() ) {
+ if (!portAuthResult.isAuthorized()) {
logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation());
ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation());
responseWritten = true;
break;
}
-
- if ( !receivedPort.isValid() ) {
+
+ if (!receivedPort.isValid()) {
logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
responseWritten = true;
break;
}
-
- if ( !receivedPort.isRunning() ) {
+
+ if (!receivedPort.isRunning()) {
logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
responseWritten = true;
break;
}
-
+
// PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this
// we we will simply not service the request but the sender will timeout
- if ( getVersionNegotiator().getVersion() > 1 ) {
- for ( final Connection connection : port.getConnections() ) {
- if ( connection.getFlowFileQueue().isFull() ) {
+ if (getVersionNegotiator().getVersion() > 1) {
+ for (final Connection connection : port.getConnections()) {
+ if (connection.getFlowFileQueue().isFull()) {
logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort);
ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
responseWritten = true;
@@ -219,7 +218,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
}
}
}
-
+
break;
}
}
@@ -227,54 +226,54 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
throw new HandshakeException("Received invalid value for property '" + property + "'; invalid value: " + value);
}
}
-
- if ( useGzip == null ) {
+
+ if (useGzip == null) {
logger.debug("Responding with ResponseCode MISSING_PROPERTY because GZIP Property missing");
ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.GZIP.name());
throw new HandshakeException("Missing Property " + HandshakeProperty.GZIP.name());
}
-
+
// send "OK" response
- if ( !responseWritten ) {
+ if (!responseWritten) {
ResponseCode.PROPERTIES_OK.writeResponse(dos);
}
-
+
logger.debug("{} Finished handshake with {}", this, peer);
handshakeCompleted = true;
}
-
+
@Override
public boolean isHandshakeSuccessful() {
return handshakeCompleted;
}
-
+
@Override
public RootGroupPort getPort() {
return port;
}
-
+
@Override
public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException {
- if ( !handshakeCompleted ) {
+ if (!handshakeCompleted) {
throw new IllegalStateException("Handshake has not been completed");
}
- if ( shutdown ) {
+ if (shutdown) {
throw new IllegalStateException("Protocol is shutdown");
}
- logger.debug("{} Negotiating Codec with {} using {}", new Object[] {this, peer, peer.getCommunicationsSession()});
+ logger.debug("{} Negotiating Codec with {} using {}", new Object[]{this, peer, peer.getCommunicationsSession()});
final CommunicationsSession commsSession = peer.getCommunicationsSession();
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-
- if ( port == null ) {
- RemoteResourceFactory.rejectCodecNegotiation(dis, dos, "Cannot transfer FlowFiles because no port was specified");
+
+ if (port == null) {
+ RemoteResourceFactory.rejectCodecNegotiation(dis, dos, "Cannot transfer FlowFiles because no port was specified");
}
-
+
// Negotiate the FlowFileCodec to use.
try {
negotiatedFlowFileCodec = RemoteResourceFactory.receiveCodecNegotiation(dis, dos);
- logger.debug("{} Negotiated Codec {} with {}", new Object[] {this, negotiatedFlowFileCodec, peer});
+ logger.debug("{} Negotiated Codec {} with {}", new Object[]{this, negotiatedFlowFileCodec, peer});
return negotiatedFlowFileCodec;
} catch (final HandshakeException e) {
throw new ProtocolException(e.toString());
@@ -286,13 +285,12 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
return negotiatedFlowFileCodec;
}
-
@Override
public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- if ( !handshakeCompleted ) {
+ if (!handshakeCompleted) {
throw new IllegalStateException("Handshake has not been completed");
}
- if ( shutdown ) {
+ if (shutdown) {
throw new IllegalStateException("Protocol is shutdown");
}
@@ -301,22 +299,22 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
String remoteDn = commsSession.getUserDn();
- if ( remoteDn == null ) {
+ if (remoteDn == null) {
remoteDn = "none";
}
FlowFile flowFile = session.get();
- if ( flowFile == null ) {
+ if (flowFile == null) {
// we have no data to send. Notify the peer.
logger.debug("{} No data to send to {}", this, peer);
ResponseCode.NO_MORE_DATA.writeResponse(dos);
return 0;
}
-
+
// we have data to send.
logger.debug("{} Data is available to send to {}", this, peer);
ResponseCode.MORE_DATA.writeResponse(dos);
-
+
final StopWatch stopWatch = new StopWatch(true);
long bytesSent = 0L;
final Set<FlowFile> flowFilesSent = new HashSet<>();
@@ -328,27 +326,27 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
String calculatedCRC = "";
while (continueTransaction) {
final OutputStream flowFileOutputStream = useGzip ? new CompressionOutputStream(dos) : dos;
- logger.debug("{} Sending {} to {}", new Object[] {this, flowFile, peer});
-
+ logger.debug("{} Sending {} to {}", new Object[]{this, flowFile, peer});
+
final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(flowFileOutputStream, crc);
final StopWatch transferWatch = new StopWatch(true);
-
+
final FlowFile toSend = flowFile;
session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- final DataPacket dataPacket = new StandardDataPacket(toSend.getAttributes(), in, toSend.getSize());
- codec.encode(dataPacket, checkedOutputStream);
- }
+ @Override
+ public void process(final InputStream in) throws IOException {
+ final DataPacket dataPacket = new StandardDataPacket(toSend.getAttributes(), in, toSend.getSize());
+ codec.encode(dataPacket, checkedOutputStream);
+ }
});
-
+
final long transmissionMillis = transferWatch.getElapsed(TimeUnit.MILLISECONDS);
-
+
// need to close the CompressionOutputStream in order to force it write out any remaining bytes.
// Otherwise, do NOT close it because we don't want to close the underlying stream
// (CompressionOutputStream will not close the underlying stream when it's closed)
- if ( useGzip ) {
+ if (useGzip) {
checkedOutputStream.close();
}
@@ -358,33 +356,33 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false);
session.remove(flowFile);
-
+
// determine if we should check for more data on queue.
final long sendingNanos = System.nanoTime() - startNanos;
boolean poll = true;
- if ( sendingNanos >= requestedBatchNanos && requestedBatchNanos > 0L ) {
+ if (sendingNanos >= requestedBatchNanos && requestedBatchNanos > 0L) {
poll = false;
}
- if ( bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L ) {
+ if (bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L) {
poll = false;
}
- if ( flowFilesSent.size() >= requestedBatchCount && requestedBatchCount > 0 ) {
+ if (flowFilesSent.size() >= requestedBatchCount && requestedBatchCount > 0) {
poll = false;
}
-
- if ( requestedBatchNanos == 0 && requestedBatchBytes == 0 && requestedBatchCount == 0 ) {
+
+ if (requestedBatchNanos == 0 && requestedBatchBytes == 0 && requestedBatchCount == 0) {
poll = (sendingNanos < DEFAULT_BATCH_NANOS);
}
-
- if ( poll ) {
+
+ if (poll) {
// we've not elapsed the requested sending duration, so get more data.
flowFile = session.get();
} else {
flowFile = null;
}
-
+
continueTransaction = (flowFile != null);
- if ( continueTransaction ) {
+ if (continueTransaction) {
logger.debug("{} Sending ContinueTransaction indicator to {}", this, peer);
ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
} else {
@@ -393,19 +391,21 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
calculatedCRC = String.valueOf(checkedOutputStream.getChecksum().getValue());
}
}
-
+
// we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
final Response transactionConfirmationResponse = Response.read(dis);
- if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+ if (transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION) {
// Confirm Checksum and echo back the confirmation.
logger.debug("{} Received {} from {}", this, transactionConfirmationResponse, peer);
final String receivedCRC = transactionConfirmationResponse.getMessage();
- if ( versionNegotiator.getVersion() > 3 ) {
- if ( !receivedCRC.equals(calculatedCRC) ) {
+ if (versionNegotiator.getVersion() > 3) {
+ if (!receivedCRC.equals(calculatedCRC)) {
ResponseCode.BAD_CHECKSUM.writeResponse(dos);
session.rollback();
- throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+ throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as "
+ + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC
+ + "; canceling transaction and rolling back session");
}
}
@@ -415,61 +415,60 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
}
final String flowFileDescription = flowFilesSent.size() < 20 ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
-
+
final Response transactionResponse;
try {
transactionResponse = Response.read(dis);
} catch (final IOException e) {
- logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." +
- " It is unknown whether or not the peer successfully received/processed the data." +
- " Therefore, {} will be rolled back, possibly resulting in data duplication of {}",
- this, peer, session, flowFileDescription);
+ logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator."
+ + " It is unknown whether or not the peer successfully received/processed the data."
+ + " Therefore, {} will be rolled back, possibly resulting in data duplication of {}",
+ this, peer, session, flowFileDescription);
session.rollback();
throw e;
}
-
- logger.debug("{} received {} from {}", new Object[] {this, transactionResponse, peer});
- if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+
+ logger.debug("{} received {} from {}", new Object[]{this, transactionResponse, peer});
+ if (transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
peer.penalize(port.getIdentifier(), port.getYieldPeriod(TimeUnit.MILLISECONDS));
- } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+ } else if (transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) {
throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
}
-
+
session.commit();
-
+
stopWatch.stop();
final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final String dataSize = FormatUtils.formatDataSize(bytesSent);
- logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+ logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{
this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
return flowFilesSent.size();
}
-
-
+
@Override
public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- if ( !handshakeCompleted ) {
+ if (!handshakeCompleted) {
throw new IllegalStateException("Handshake has not been completed");
}
- if ( shutdown ) {
+ if (shutdown) {
throw new IllegalStateException("Protocol is shutdown");
}
logger.debug("{} receiving FlowFiles from {}", this, peer);
-
+
final CommunicationsSession commsSession = peer.getCommunicationsSession();
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
String remoteDn = commsSession.getUserDn();
- if ( remoteDn == null ) {
+ if (remoteDn == null) {
remoteDn = "none";
}
final StopWatch stopWatch = new StopWatch(true);
final CRC32 crc = new CRC32();
-
+
// Peer has data. Otherwise, we would not have been called, because they would not have sent
// a SEND_FLOWFILES request to use. Just decode the bytes into FlowFiles until peer says he's
// finished sending data.
@@ -486,18 +485,19 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
FlowFile flowFile = session.create();
flowFile = session.importFrom(dataPacket.getData(), flowFile);
flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
-
+
final long transferNanos = System.nanoTime() - startNanos;
final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
-
+
final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceSystemFlowFileUuid;
- session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null ? null : "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis);
+ session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null
+ ? null : "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis);
session.transfer(flowFile, Relationship.ANONYMOUS);
flowFilesReceived.add(flowFile);
bytesReceived += flowFile.getSize();
-
+
final Response transactionResponse = Response.read(dis);
switch (transactionResponse.getCode()) {
case CONTINUE_TRANSACTION:
@@ -516,7 +516,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionResponse);
}
}
-
+
// we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
// to peer so that we can verify that the connection is still open. This is a two-phase commit,
// which helps to prevent the chances of data duplication. Without doing this, we may commit the
@@ -526,7 +526,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
// time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
logger.debug("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
-
+
final Response confirmTransactionResponse = Response.read(dis);
logger.debug("{} Received {} from {}", this, confirmTransactionResponse, peer);
@@ -539,11 +539,11 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
default:
throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
}
-
+
// Commit the session so that we have persisted the data
session.commit();
-
- if ( context.getAvailableRelationships().isEmpty() ) {
+
+ if (context.getAvailableRelationships().isEmpty()) {
// Confirm that we received the data and the peer can now discard it but that the peer should not
// send any more data for a bit
logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
@@ -553,30 +553,30 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
}
-
+
stopWatch.stop();
final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final String dataSize = FormatUtils.formatDataSize(bytesReceived);
- logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
+ logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
return flowFilesReceived.size();
}
-
+
@Override
public RequestType getRequestType(final Peer peer) throws IOException {
- if ( !handshakeCompleted ) {
+ if (!handshakeCompleted) {
throw new IllegalStateException("Handshake has not been completed");
}
- if ( shutdown ) {
+ if (shutdown) {
throw new IllegalStateException("Protocol is shutdown");
}
- logger.debug("{} Reading Request Type from {} using {}", new Object[] {this, peer, peer.getCommunicationsSession()});
+ logger.debug("{} Reading Request Type from {} using {}", new Object[]{this, peer, peer.getCommunicationsSession()});
final RequestType requestType = RequestType.readRequestType(new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()));
- logger.debug("{} Got Request Type {} from {}", new Object[] {this, requestType, peer});
+ logger.debug("{} Got Request Type {} from {}", new Object[]{this, requestType, peer});
return requestType;
}
@@ -599,10 +599,10 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
@Override
public void sendPeerList(final Peer peer) throws IOException {
- if ( !handshakeCompleted ) {
+ if (!handshakeCompleted) {
throw new IllegalStateException("Handshake has not been completed");
}
- if ( shutdown ) {
+ if (shutdown) {
throw new IllegalStateException("Protocol is shutdown");
}
@@ -611,7 +611,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
final NiFiProperties properties = NiFiProperties.getInstance();
-
+
// we have only 1 peer: ourselves.
dos.writeInt(1);
dos.writeUTF(InetAddress.getLocalHost().getHostName());
@@ -620,12 +620,12 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
dos.writeInt(0); // doesn't matter how many FlowFiles we have, because we're the only host.
dos.flush();
}
-
+
@Override
public String getResourceName() {
return RESOURCE_NAME;
}
-
+
@Override
public void setNodeInformant(final NodeInformant nodeInformant) {
}
@@ -634,7 +634,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
public long getRequestExpiration() {
return requestExpirationMillis;
}
-
+
@Override
public String toString() {
return "SocketFlowFileServerProtocol[CommsID=" + commsIdentifier + "]";
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java
index b9a567b..8380f8b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java
@@ -39,13 +39,13 @@ package org.apache.nifi.remote;
// final Map<NodeInformation, Destination> destinationMap = new LinkedHashMap<>();
// final NodeInformation node1 = new NodeInformation("hostA", 80, 90, true, 3);
// final NodeInformation node2 = new NodeInformation("hostB", 80, 90, true, 500);
-//
+//
// final Destination node1Destination = new Destination(createRemoteGroupPort("PortA"), null, node1, TransferDirection.SEND, true, null);
// final Destination node2Destination = new Destination(createRemoteGroupPort("PortB"), null, node2, TransferDirection.SEND, true, null);
-//
+//
// destinationMap.put(node1, node1Destination);
// destinationMap.put(node2, node2Destination);
-//
+//
// final List<Destination> destinations = StandardSiteToSiteProtocol.formulateDestinationList(destinationMap, TransferDirection.SEND);
// int node1Count = 0, node2Count = 0;
// for ( final Destination destination : destinations ) {
@@ -57,30 +57,30 @@ package org.apache.nifi.remote;
// Assert.fail("Got Destination for unknkown NodeInformation");
// }
// }
-//
+//
// System.out.println(node1Count);
// System.out.println(node2Count);
-//
+//
// final double node1Pct = (double) node1Count / (double) (node1Count + node2Count);
// assertEquals(0.80, node1Pct, 0.01);
-// // node1 should get the most but is not allowed to have more than approximately 80% of the data.
+// // node1 should get the most but is not allowed to have more than approximately 80% of the data.
// }
-//
+//
// @Test
// public void testWeightedDistributionWithThreeNodes() throws IOException {
// final Map<NodeInformation, Destination> destinationMap = new LinkedHashMap<>();
// final NodeInformation node1 = new NodeInformation("hostA", 80, 90, true, 3);
// final NodeInformation node2 = new NodeInformation("hostB", 80, 90, true, 500);
// final NodeInformation node3 = new NodeInformation("hostC", 80, 90, true, 500);
-//
+//
// final Destination node1Destination = new Destination(createRemoteGroupPort("PortA"), null, node1, TransferDirection.SEND, true, null);
// final Destination node2Destination = new Destination(createRemoteGroupPort("PortB"), null, node2, TransferDirection.SEND, true, null);
// final Destination node3Destination = new Destination(createRemoteGroupPort("PortC"), null, node3, TransferDirection.SEND, true, null);
-//
+//
// destinationMap.put(node1, node1Destination);
// destinationMap.put(node2, node2Destination);
// destinationMap.put(node3, node3Destination);
-//
+//
// final List<Destination> destinations = StandardSiteToSiteProtocol.formulateDestinationList(destinationMap, TransferDirection.SEND);
// int node1Count = 0, node2Count = 0, node3Count = 0;
// for ( final Destination destination : destinations ) {
@@ -94,20 +94,20 @@ package org.apache.nifi.remote;
// Assert.fail("Got Destination for unknkown NodeInformation");
// }
// }
-//
+//
// System.out.println(node1Count);
// System.out.println(node2Count);
// System.out.println(node3Count);
-//
+//
// final double node1Pct = (double) node1Count / (double) (node1Count + node2Count + node3Count);
// final double node2Pct = (double) node2Count / (double) (node1Count + node2Count + node3Count);
// final double node3Pct = (double) node3Count / (double) (node1Count + node2Count + node3Count);
-//
+//
// assertEquals(0.5, node1Pct, 0.02);
// assertEquals(0.25, node2Pct, 0.02);
// assertEquals(node2Pct, node3Pct, 0.02);
// }
-//
+//
// private RemoteGroupPort createRemoteGroupPort(final String portName) {
// RemoteGroupPort port = Mockito.mock(RemoteGroupPort.class);
// Mockito.when(port.getName()).thenReturn(portName);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java
index 4e55f5f..03f8190 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java
@@ -65,7 +65,7 @@ package org.apache.nifi.remote.io.socket;
// System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
// final SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 5000));
// channel.configureBlocking(false);
-//
+//
// final CommunicationsSession commsSession;
// commsSession = new SocketChannelCommunicationsSession(channel, "", null);
// commsSession.setUri("nifi://localhost:5000");
@@ -74,7 +74,7 @@ package org.apache.nifi.remote.io.socket;
//
// dos.write(CommunicationsProtocol.MAGIC_BYTES);
// dos.flush();
-//
+//
// final EventReporter eventReporter = Mockito.mock(EventReporter.class);
// final StandardSiteToSiteProtocol proposedProtocol = new StandardSiteToSiteProtocol(commsSession, eventReporter, NiFiProperties.getInstance());
//
@@ -84,20 +84,20 @@ package org.apache.nifi.remote.io.socket;
// final RemoteProcessGroup rpg = Mockito.mock(RemoteProcessGroup.class);
// Mockito.when(rpg.getCommunicationsTimeout(Mockito.any(TimeUnit.class))).thenReturn(2000);
// Mockito.when(rpg.getTargetUri()).thenReturn( new URI("https://localhost:5050/") );
-//
+//
// final RemoteGroupPort port = Mockito.mock(RemoteGroupPort.class);
// Mockito.when(port.getIdentifier()).thenReturn("90880680-d6da-40be-b2cc-a15423de2e1a");
// Mockito.when(port.getName()).thenReturn("Data In");
// Mockito.when(port.getRemoteProcessGroup()).thenReturn(rpg);
-//
+//
// negotiatedProtocol.initiateHandshake(port, TransferDirection.SEND);
// }
-//
+//
// @Test
// public void testInputOutputStreams() throws IOException, InterruptedException {
// final ServerThread server = new ServerThread();
// server.start();
-//
+//
// int port = server.getPort();
// while ( port <= 0 ) {
// Thread.sleep(10L);
@@ -106,11 +106,11 @@ package org.apache.nifi.remote.io.socket;
//
// final SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", port));
// channel.configureBlocking(false);
-//
+//
// final OutputStream out = new SocketChannelOutputStream(channel);
// final InputStream in = new SocketChannelInputStream(channel);
// final DataInputStream dataIn = new DataInputStream(in);
-//
+//
// final byte[] sent = new byte[DATA_SIZE];
// for (int i=0; i < sent.length; i++) {
// sent[i] = (byte) (i % 255);
@@ -125,21 +125,21 @@ package org.apache.nifi.remote.io.socket;
// final float megabytes = (float) DATA_SIZE / (1024F * 1024F);
// final float MBperS = megabytes / seconds;
// System.out.println("Millis: " + millis + "; MB/s: " + MBperS);
-//
+//
// Thread.sleep(2500L);
// final byte[] received = server.getReceivedData();
// System.out.println("Server received " + received.length + " bytes");
// server.clearReceivedData();
// assertTrue(Arrays.equals(sent, received));
-//
+//
// final long val = dataIn.readLong();
// assertEquals(DATA_SIZE, val);
// System.out.println(val);
// }
-//
+//
// server.shutdown();
// }
-//
+//
// public final long toLong(final byte[] buffer) throws IOException {
// return (((long)buffer[0] << 56) +
// ((long)(buffer[1] & 255) << 48) +
@@ -150,82 +150,82 @@ package org.apache.nifi.remote.io.socket;
// ((buffer[6] & 255) << 8) +
// ((buffer[7] & 255) << 0));
// }
-//
+//
// private static class ServerThread extends Thread {
// private int listeningPort;
// private final ByteArrayOutputStream received = new ByteArrayOutputStream();
-//
+//
// private volatile int readingDelay = 0;
// private volatile boolean shutdown = false;
-//
+//
// public ServerThread() {
// }
-//
+//
// public int getPort() {
// return listeningPort;
// }
-//
+//
// public byte[] getReceivedData() {
// return received.toByteArray();
// }
-//
+//
// @Override
// public void run() {
// try {
// final ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
// final ServerSocket serverSocket = serverSocketFactory.createServerSocket(0);
// this.listeningPort = serverSocket.getLocalPort();
-//
+//
// final Socket socket = serverSocket.accept();
// final InputStream stream = socket.getInputStream();
// final DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
-//
+//
// final byte[] buffer = new byte[4096];
// int len;
-//
+//
// while (!shutdown) {
// try {
// len = stream.read(buffer);
-//
+//
// System.out.println("Received " + len + " bytes");
-//
+//
// if ( readingDelay > 0 ) {
// try { Thread.sleep(readingDelay); } catch (final InterruptedException e) {}
// }
// } catch (final SocketTimeoutException e) {
// continue;
// }
-//
+//
// if ( len < 0 ) {
// return;
// }
-//
+//
// received.write(buffer, 0, len);
-//
+//
// final long length = received.size();
// if ( length % (DATA_SIZE) == 0 ) {
// dos.writeLong(length);
// dos.flush();
// }
// }
-//
+//
// System.out.println("Server successfully shutdown");
// } catch (final Exception e) {
// e.printStackTrace();
// }
// }
-//
+//
// public void clearReceivedData() {
// this.received.reset();
// }
-//
+//
// public void shutdown() {
// this.shutdown = true;
// }
-//
+//
// public void delayReading(final int millis) {
// this.readingDelay = millis;
// }
// }
-//
+//
//}