You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/01/21 07:48:49 UTC
[41/51] [partial] incubator-nifi git commit: NIFI-270 Made all
changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
deleted file mode 100644
index 10e348d..0000000
--- a/nifi/commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ /dev/null
@@ -1,876 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.nio.file.InvalidPathException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-public class NiFiProperties extends Properties {
-
- private static final long serialVersionUID = 2119177359005492702L;
-
- private static final Logger LOG = LoggerFactory.getLogger(NiFiProperties.class);
- private static NiFiProperties instance = null;
-
- // core properties
- public static final String PROPERTIES_FILE_PATH = "nifi.properties.file.path";
- public static final String FLOW_CONFIGURATION_FILE = "nifi.flow.configuration.file";
- public static final String FLOW_CONFIGURATION_ARCHIVE_FILE = "nifi.flow.configuration.archive.file";
- public static final String TASK_CONFIGURATION_FILE = "nifi.reporting.task.configuration.file";
- public static final String SERVICE_CONFIGURATION_FILE = "nifi.controller.service.configuration.file";
- public static final String AUTHORITY_PROVIDER_CONFIGURATION_FILE = "nifi.authority.provider.configuration.file";
- public static final String REPOSITORY_DATABASE_DIRECTORY = "nifi.database.directory";
- public static final String RESTORE_DIRECTORY = "nifi.restore.directory";
- public static final String VERSION = "nifi.version";
- public static final String WRITE_DELAY_INTERVAL = "nifi.flowservice.writedelay.interval";
- public static final String AUTO_RESUME_STATE = "nifi.flowcontroller.autoResumeState";
- public static final String FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.period";
- public static final String NAR_LIBRARY_DIRECTORY = "nifi.nar.library.directory";
- public static final String NAR_WORKING_DIRECTORY = "nifi.nar.working.directory";
- public static final String COMPONENT_DOCS_DIRECTORY = "nifi.documentation.working.directory";
- public static final String SENSITIVE_PROPS_KEY = "nifi.sensitive.props.key";
- public static final String SENSITIVE_PROPS_ALGORITHM = "nifi.sensitive.props.algorithm";
- public static final String SENSITIVE_PROPS_PROVIDER = "nifi.sensitive.props.provider";
- public static final String H2_URL_APPEND = "nifi.h2.url.append";
- public static final String REMOTE_INPUT_PORT = "nifi.remote.input.socket.port";
- public static final String SITE_TO_SITE_SECURE = "nifi.remote.input.secure";
- public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory";
- public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration";
- public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory";
-
- // content repository properties
- public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory.";
- public static final String CONTENT_REPOSITORY_IMPLEMENTATION = "nifi.content.repository.implementation";
- public static final String MAX_APPENDABLE_CLAIM_SIZE = "nifi.content.claim.max.appendable.size";
- public static final String MAX_FLOWFILES_PER_CLAIM = "nifi.content.claim.max.flow.files";
- public static final String CONTENT_ARCHIVE_MAX_RETENTION_PERIOD = "nifi.content.repository.archive.max.retention.period";
- public static final String CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE = "nifi.content.repository.archive.max.usage.percentage";
- public static final String CONTENT_ARCHIVE_BACK_PRESSURE_PERCENTAGE = "nifi.content.repository.archive.backpressure.percentage";
- public static final String CONTENT_ARCHIVE_ENABLED = "nifi.content.repository.archive.enabled";
- public static final String CONTENT_ARCHIVE_CLEANUP_FREQUENCY = "nifi.content.repository.archive.cleanup.frequency";
- public static final String CONTENT_VIEWER_URL = "nifi.content.viewer.url";
-
- // flowfile repository properties
- public static final String FLOWFILE_REPOSITORY_IMPLEMENTATION = "nifi.flowfile.repository.implementation";
- public static final String FLOWFILE_REPOSITORY_ALWAYS_SYNC = "nifi.flowfile.repository.always.sync";
- public static final String FLOWFILE_REPOSITORY_DIRECTORY = "nifi.flowfile.repository.directory";
- public static final String FLOWFILE_REPOSITORY_PARTITIONS = "nifi.flowfile.repository.partitions";
- public static final String FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL = "nifi.flowfile.repository.checkpoint.interval";
- public static final String FLOWFILE_SWAP_MANAGER_IMPLEMENTATION = "nifi.swap.manager.implementation";
- public static final String QUEUE_SWAP_THRESHOLD = "nifi.queue.swap.threshold";
- public static final String SWAP_IN_THREADS = "nifi.swap.in.threads";
- public static final String SWAP_IN_PERIOD = "nifi.swap.in.period";
- public static final String SWAP_OUT_THREADS = "nifi.swap.out.threads";
- public static final String SWAP_OUT_PERIOD = "nifi.swap.out.period";
-
- // provenance properties
- public static final String PROVENANCE_REPO_IMPLEMENTATION_CLASS = "nifi.provenance.repository.implementation";
- public static final String PROVENANCE_REPO_DIRECTORY_PREFIX = "nifi.provenance.repository.directory.";
- public static final String PROVENANCE_MAX_STORAGE_TIME = "nifi.provenance.repository.max.storage.time";
- public static final String PROVENANCE_MAX_STORAGE_SIZE = "nifi.provenance.repository.max.storage.size";
- public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time";
- public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size";
- public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads";
- public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover";
- public static final String PROVENANCE_INDEXED_FIELDS = "nifi.provenance.repository.indexed.fields";
- public static final String PROVENANCE_INDEXED_ATTRIBUTES = "nifi.provenance.repository.indexed.attributes";
- public static final String PROVENANCE_INDEX_SHARD_SIZE = "nifi.provenance.repository.index.shard.size";
- public static final String PROVENANCE_JOURNAL_COUNT = "nifi.provenance.repository.journal.count";
-
- // component status repository properties
- public static final String COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION = "nifi.components.status.repository.implementation";
- public static final String COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "nifi.components.status.snapshot.frequency";
-
- // encryptor properties
- public static final String NF_SENSITIVE_PROPS_KEY = "nifi.sensitive.props.key";
- public static final String NF_SENSITIVE_PROPS_ALGORITHM = "nifi.sensitive.props.algorithm";
- public static final String NF_SENSITIVE_PROPS_PROVIDER = "nifi.sensitive.props.provider";
-
- // security properties
- public static final String SECURITY_KEYSTORE = "nifi.security.keystore";
- public static final String SECURITY_KEYSTORE_TYPE = "nifi.security.keystoreType";
- public static final String SECURITY_KEYSTORE_PASSWD = "nifi.security.keystorePasswd";
- public static final String SECURITY_KEY_PASSWD = "nifi.security.keyPasswd";
- public static final String SECURITY_TRUSTSTORE = "nifi.security.truststore";
- public static final String SECURITY_TRUSTSTORE_TYPE = "nifi.security.truststoreType";
- public static final String SECURITY_TRUSTSTORE_PASSWD = "nifi.security.truststorePasswd";
- public static final String SECURITY_NEED_CLIENT_AUTH = "nifi.security.needClientAuth";
- public static final String SECURITY_USER_AUTHORITY_PROVIDER = "nifi.security.user.authority.provider";
- public static final String SECURITY_CLUSTER_AUTHORITY_PROVIDER_PORT = "nifi.security.cluster.authority.provider.port";
- public static final String SECURITY_CLUSTER_AUTHORITY_PROVIDER_THREADS = "nifi.security.cluster.authority.provider.threads";
- public static final String SECURITY_USER_CREDENTIAL_CACHE_DURATION = "nifi.security.user.credential.cache.duration";
- public static final String SECURITY_SUPPORT_NEW_ACCOUNT_REQUESTS = "nifi.security.support.new.account.requests";
- public static final String SECURITY_DEFAULT_USER_ROLES = "nifi.security.default.user.roles";
- public static final String SECURITY_OCSP_RESPONDER_URL = "nifi.security.ocsp.responder.url";
- public static final String SECURITY_OCSP_RESPONDER_CERTIFICATE = "nifi.security.ocsp.responder.certificate";
-
- // web properties
- public static final String WEB_WAR_DIR = "nifi.web.war.directory";
- public static final String WEB_HTTP_PORT = "nifi.web.http.port";
- public static final String WEB_HTTP_HOST = "nifi.web.http.host";
- public static final String WEB_HTTPS_PORT = "nifi.web.https.port";
- public static final String WEB_HTTPS_HOST = "nifi.web.https.host";
- public static final String WEB_WORKING_DIR = "nifi.web.jetty.working.directory";
- public static final String WEB_THREADS = "nifi.web.jetty.threads";
-
- // ui properties
- public static final String UI_BANNER_TEXT = "nifi.ui.banner.text";
- public static final String UI_AUTO_REFRESH_INTERVAL = "nifi.ui.autorefresh.interval";
-
- // cluster common properties
- public static final String CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "nifi.cluster.protocol.heartbeat.interval";
- public static final String CLUSTER_PROTOCOL_IS_SECURE = "nifi.cluster.protocol.is.secure";
- public static final String CLUSTER_PROTOCOL_SOCKET_TIMEOUT = "nifi.cluster.protocol.socket.timeout";
- public static final String CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT = "nifi.cluster.protocol.connection.handshake.timeout";
- public static final String CLUSTER_PROTOCOL_USE_MULTICAST = "nifi.cluster.protocol.use.multicast";
- public static final String CLUSTER_PROTOCOL_MULTICAST_ADDRESS = "nifi.cluster.protocol.multicast.address";
- public static final String CLUSTER_PROTOCOL_MULTICAST_PORT = "nifi.cluster.protocol.multicast.port";
- public static final String CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY = "nifi.cluster.protocol.multicast.service.broadcast.delay";
- public static final String CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS = "nifi.cluster.protocol.multicast.service.locator.attempts";
- public static final String CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "nifi.cluster.protocol.multicast.service.locator.attempts.delay";
-
- // cluster node properties
- public static final String CLUSTER_IS_NODE = "nifi.cluster.is.node";
- public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address";
- public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port";
- public static final String CLUSTER_NODE_PROTOCOL_THREADS = "nifi.cluster.node.protocol.threads";
- public static final String CLUSTER_NODE_UNICAST_MANAGER_ADDRESS = "nifi.cluster.node.unicast.manager.address";
- public static final String CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT = "nifi.cluster.node.unicast.manager.protocol.port";
-
- // cluster manager properties
- public static final String CLUSTER_IS_MANAGER = "nifi.cluster.is.manager";
- public static final String CLUSTER_MANAGER_ADDRESS = "nifi.cluster.manager.address";
- public static final String CLUSTER_MANAGER_PROTOCOL_PORT = "nifi.cluster.manager.protocol.port";
- public static final String CLUSTER_MANAGER_NODE_FIREWALL_FILE = "nifi.cluster.manager.node.firewall.file";
- public static final String CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE = "nifi.cluster.manager.node.event.history.size";
- public static final String CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT = "nifi.cluster.manager.node.api.connection.timeout";
- public static final String CLUSTER_MANAGER_NODE_API_READ_TIMEOUT = "nifi.cluster.manager.node.api.read.timeout";
- public static final String CLUSTER_MANAGER_NODE_API_REQUEST_THREADS = "nifi.cluster.manager.node.api.request.threads";
- public static final String CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY = "nifi.cluster.manager.flow.retrieval.delay";
- public static final String CLUSTER_MANAGER_PROTOCOL_THREADS = "nifi.cluster.manager.protocol.threads";
- public static final String CLUSTER_MANAGER_SAFEMODE_DURATION = "nifi.cluster.manager.safemode.duration";
-
- // defaults
- public static final String DEFAULT_TITLE = "NiFi";
- public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
- public static final String DEFAULT_AUTHORITY_PROVIDER_CONFIGURATION_FILE = "conf/authority-providers.xml";
- public static final String DEFAULT_USER_CREDENTIAL_CACHE_DURATION = "24 hours";
- public static final Integer DEFAULT_REMOTE_INPUT_PORT = null;
- public static final Path DEFAULT_TEMPLATE_DIRECTORY = Paths.get("conf", "templates");
- public static final int DEFAULT_WEB_THREADS = 200;
- public static final String DEFAULT_WEB_WORKING_DIR = "./work/jetty";
- public static final String DEFAULT_NAR_WORKING_DIR = "./work/nar";
- public static final String DEFAULT_COMPONENT_DOCS_DIRECTORY = "./work/docs/components";
- public static final String DEFAULT_NAR_LIBRARY_DIR = "./lib";
- public static final String DEFAULT_FLOWFILE_REPO_PARTITIONS = "256";
- public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "2 min";
- public static final int DEFAULT_MAX_FLOWFILES_PER_CLAIM = 100;
- public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000;
- public static final String DEFAULT_SWAP_STORAGE_LOCATION = "./flowfile_repository/swap";
- public static final String DEFAULT_SWAP_IN_PERIOD = "1 sec";
- public static final String DEFAULT_SWAP_OUT_PERIOD = "5 sec";
- public static final int DEFAULT_SWAP_IN_THREADS = 4;
- public static final int DEFAULT_SWAP_OUT_THREADS = 4;
- public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec";
- public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state";
- public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins";
-
- // cluster common defaults
- public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec";
- public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY = "500 ms";
- public static final int DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS = 3;
- public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec";
- public static final String DEFAULT_CLUSTER_PROTOCOL_SOCKET_TIMEOUT = "30 sec";
- public static final String DEFAULT_CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT = "45 sec";
-
- // cluster node defaults
- public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 2;
-
- // cluster manager defaults
- public static final int DEFAULT_CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE = 10;
- public static final String DEFAULT_CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT = "30 sec";
- public static final String DEFAULT_CLUSTER_MANAGER_NODE_API_READ_TIMEOUT = "30 sec";
- public static final int DEFAULT_CLUSTER_MANAGER_NODE_API_NUM_REQUEST_THREADS = 10;
- public static final String DEFAULT_CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY = "5 sec";
- public static final int DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS = 10;
- public static final String DEFAULT_CLUSTER_MANAGER_SAFEMODE_DURATION = "0 sec";
-
- private NiFiProperties() {
- super();
- }
-
- /**
- * This is the method through which the NiFiProperties object should be
- * obtained.
- *
- * @return the NiFiProperties object to use
- * @throws RuntimeException if unable to load properties file
- */
- public static synchronized NiFiProperties getInstance() {
- if (null == instance) {
- final NiFiProperties suspectInstance = new NiFiProperties();
- final String nfPropertiesFilePath = System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH);
- if (null == nfPropertiesFilePath || nfPropertiesFilePath.trim().length() == 0) {
- throw new RuntimeException("Requires a system property called \'" + NiFiProperties.PROPERTIES_FILE_PATH + "\' and this is not set or has no value");
- }
- final File propertiesFile = new File(nfPropertiesFilePath);
- if (!propertiesFile.exists()) {
- throw new RuntimeException("Properties file doesn't exist \'" + propertiesFile.getAbsolutePath() + "\'");
- }
- if (!propertiesFile.canRead()) {
- throw new RuntimeException("Properties file exists but cannot be read \'" + propertiesFile.getAbsolutePath() + "\'");
- }
- InputStream inStream = null;
- try {
- inStream = new BufferedInputStream(new FileInputStream(propertiesFile));
- suspectInstance.load(inStream);
- } catch (final Exception ex) {
- LOG.error("Cannot load properties file due to " + ex.getLocalizedMessage());
- throw new RuntimeException("Cannot load properties file due to " + ex.getLocalizedMessage(), ex);
- } finally {
- if (null != inStream) {
- try {
- inStream.close();
- } catch (final Exception ex) {
- /**
- * do nothing *
- */
- }
- }
- }
- instance = suspectInstance;
- }
- return instance;
- }
-
- // getters for core properties //
- public File getFlowConfigurationFile() {
- try {
- return new File(getProperty(FLOW_CONFIGURATION_FILE));
- } catch (Exception ex) {
- return null;
- }
- }
-
- public File getFlowConfigurationFileDir() {
- try {
- return getFlowConfigurationFile().getParentFile();
- } catch (Exception ex) {
- return null;
- }
- }
-
- private Integer getPropertyAsPort(final String propertyName, final Integer defaultValue) {
- final String port = getProperty(propertyName);
- if (StringUtils.isEmpty(port)) {
- return defaultValue;
- }
- try {
- final int val = Integer.parseInt(port);
- if (val <= 0 || val > 65535) {
- throw new RuntimeException("Valid port range is 0 - 65535 but got " + val);
- }
- return val;
- } catch (final NumberFormatException e) {
- return defaultValue;
- }
- }
-
- public int getQueueSwapThreshold() {
- final String thresholdValue = getProperty(QUEUE_SWAP_THRESHOLD);
- if (thresholdValue == null) {
- return DEFAULT_QUEUE_SWAP_THRESHOLD;
- }
-
- try {
- return Integer.parseInt(thresholdValue);
- } catch (final NumberFormatException e) {
- return DEFAULT_QUEUE_SWAP_THRESHOLD;
- }
- }
-
- public Integer getIntegerProperty(final String propertyName, final Integer defaultValue) {
- final String value = getProperty(propertyName);
- if (value == null) {
- return defaultValue;
- }
-
- try {
- return Integer.parseInt(getProperty(propertyName));
- } catch (final Exception e) {
- return defaultValue;
- }
- }
-
- public int getSwapInThreads() {
- return getIntegerProperty(SWAP_IN_THREADS, DEFAULT_SWAP_IN_THREADS);
- }
-
- public int getSwapOutThreads() {
- final String value = getProperty(SWAP_OUT_THREADS);
- if (value == null) {
- return DEFAULT_SWAP_OUT_THREADS;
- }
-
- try {
- return Integer.parseInt(getProperty(SWAP_OUT_THREADS));
- } catch (final Exception e) {
- return DEFAULT_SWAP_OUT_THREADS;
- }
- }
-
- public String getSwapInPeriod() {
- return getProperty(SWAP_IN_PERIOD, DEFAULT_SWAP_IN_PERIOD);
- }
-
- public String getSwapOutPeriod() {
- return getProperty(SWAP_OUT_PERIOD, DEFAULT_SWAP_OUT_PERIOD);
- }
-
- public String getAdministrativeYieldDuration() {
- return getProperty(ADMINISTRATIVE_YIELD_DURATION, DEFAULT_ADMINISTRATIVE_YIELD_DURATION);
- }
-
- /**
- * The socket port to listen on for a Remote Input Port.
- *
- * @return
- */
- public Integer getRemoteInputPort() {
- return getPropertyAsPort(REMOTE_INPUT_PORT, DEFAULT_REMOTE_INPUT_PORT);
- }
-
- /**
- * @return False if property value is 'false'; True otherwise.
- */
- public Boolean isSiteToSiteSecure() {
- final String secureVal = getProperty(SITE_TO_SITE_SECURE, "true");
-
- if ("false".equalsIgnoreCase(secureVal)) {
- return false;
- }else{
- return true;
- }
-
- }
-
- /**
- * Returns the directory to which Templates are to be persisted
- *
- * @return
- */
- public Path getTemplateDirectory() {
- final String strVal = getProperty(TEMPLATE_DIRECTORY);
- return (strVal == null) ? DEFAULT_TEMPLATE_DIRECTORY : Paths.get(strVal);
- }
-
- /**
- * Get the flow service write delay.
- *
- * @return The write delay
- */
- public String getFlowServiceWriteDelay() {
- return getProperty(WRITE_DELAY_INTERVAL);
- }
-
- /**
- * Returns whether the processors should be started automatically when the
- * application loads.
- *
- * @return Whether to auto start the processors or not
- */
- public boolean getAutoResumeState() {
- final String rawAutoResumeState = getProperty(AUTO_RESUME_STATE, DEFAULT_AUTO_RESUME_STATE.toString());
- return Boolean.parseBoolean(rawAutoResumeState);
- }
-
- /**
- * Returns the number of partitions that should be used for the FlowFile
- * Repository
- *
- * @return
- */
- public int getFlowFileRepositoryPartitions() {
- final String rawProperty = getProperty(FLOWFILE_REPOSITORY_PARTITIONS, DEFAULT_FLOWFILE_REPO_PARTITIONS);
- return Integer.parseInt(rawProperty);
- }
-
- /**
- * Returns the number of milliseconds between FlowFileRepository
- * checkpointing
- *
- * @return
- */
- public String getFlowFileRepositoryCheckpointInterval() {
- return getProperty(FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL, DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL);
- }
-
- /**
- * @return the restore directory or null if not configured
- */
- public File getRestoreDirectory() {
- final String value = getProperty(RESTORE_DIRECTORY);
- if (StringUtils.isBlank(value)) {
- return null;
- } else {
- return new File(value);
- }
- }
-
- /**
- * @return the user authorities file
- */
- public File getAuthorityProviderConfiguraitonFile() {
- final String value = getProperty(AUTHORITY_PROVIDER_CONFIGURATION_FILE);
- if (StringUtils.isBlank(value)) {
- return new File(DEFAULT_AUTHORITY_PROVIDER_CONFIGURATION_FILE);
- } else {
- return new File(value);
- }
- }
-
- /**
- * Will default to true unless the value is explicitly set to false.
- *
- * @return Whether client auth is required
- */
- public boolean getNeedClientAuth() {
- boolean needClientAuth = true;
- String rawNeedClientAuth = getProperty(SECURITY_NEED_CLIENT_AUTH);
- if ("false".equalsIgnoreCase(rawNeedClientAuth)) {
- needClientAuth = false;
- }
- return needClientAuth;
- }
-
- public String getUserCredentialCacheDuration() {
- return getProperty(SECURITY_USER_CREDENTIAL_CACHE_DURATION, DEFAULT_USER_CREDENTIAL_CACHE_DURATION);
- }
-
- public boolean getSupportNewAccountRequests() {
- boolean shouldSupport = true;
- String rawShouldSupport = getProperty(SECURITY_SUPPORT_NEW_ACCOUNT_REQUESTS);
- if ("false".equalsIgnoreCase(rawShouldSupport)) {
- shouldSupport = false;
- }
- return shouldSupport;
- }
-
- // getters for web properties //
- public Integer getPort() {
- Integer port = null;
- try {
- port = Integer.parseInt(getProperty(WEB_HTTP_PORT));
- } catch (NumberFormatException nfe) {
- }
- return port;
- }
-
- public Integer getSslPort() {
- Integer sslPort = null;
- try {
- sslPort = Integer.parseInt(getProperty(WEB_HTTPS_PORT));
- } catch (NumberFormatException nfe) {
- }
- return sslPort;
- }
-
- public int getWebThreads() {
- return getIntegerProperty(WEB_THREADS, DEFAULT_WEB_THREADS);
- }
-
- public File getWebWorkingDirectory() {
- return new File(getProperty(WEB_WORKING_DIR, DEFAULT_WEB_WORKING_DIR));
- }
-
- public File getComponentDocumentationWorkingDirectory() {
- return new File(getProperty(COMPONENT_DOCS_DIRECTORY, DEFAULT_COMPONENT_DOCS_DIRECTORY));
- }
-
- public File getNarWorkingDirectory() {
- return new File(getProperty(NAR_WORKING_DIRECTORY, DEFAULT_NAR_WORKING_DIR));
- }
-
- public File getFrameworkWorkingDirectory() {
- return new File(getNarWorkingDirectory(), "framework");
- }
-
- public File getExtensionsWorkingDirectory() {
- return new File(getNarWorkingDirectory(), "extensions");
- }
-
- public File getNarLibraryDirectory() {
- return new File(getProperty(NAR_LIBRARY_DIRECTORY, DEFAULT_NAR_LIBRARY_DIR));
- }
-
- // getters for ui properties //
- /**
- * Get the title for the UI.
- *
- * @return The UI title
- */
- public String getUiTitle() {
- return this.getProperty(VERSION, DEFAULT_TITLE);
- }
-
- /**
- * Get the banner text.
- *
- * @return The banner text
- */
- public String getBannerText() {
- return this.getProperty(UI_BANNER_TEXT, StringUtils.EMPTY);
- }
-
- /**
- * Returns the auto refresh interval in seconds.
- *
- * @return
- */
- public String getAutoRefreshInterval() {
- return getProperty(UI_AUTO_REFRESH_INTERVAL);
- }
-
- // getters for cluster protocol properties //
- public String getClusterProtocolHeartbeatInterval() {
- return getProperty(CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
- }
-
- public String getNodeHeartbeatInterval() {
- return getClusterProtocolHeartbeatInterval();
- }
-
- public String getClusterProtocolSocketTimeout() {
- return getProperty(CLUSTER_PROTOCOL_SOCKET_TIMEOUT, DEFAULT_CLUSTER_PROTOCOL_SOCKET_TIMEOUT);
- }
-
- public String getClusterProtocolConnectionHandshakeTimeout() {
- return getProperty(CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT, DEFAULT_CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT);
- }
-
- public boolean getClusterProtocolUseMulticast() {
- return Boolean.parseBoolean(getProperty(CLUSTER_PROTOCOL_USE_MULTICAST));
- }
-
- public InetSocketAddress getClusterProtocolMulticastAddress() {
- try {
- String multicastAddress = getProperty(CLUSTER_PROTOCOL_MULTICAST_ADDRESS);
- int multicastPort = Integer.parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_PORT));
- return new InetSocketAddress(multicastAddress, multicastPort);
- } catch (Exception ex) {
- throw new RuntimeException("Invalid multicast address/port due to: " + ex, ex);
- }
- }
-
- public String getClusterProtocolMulticastServiceBroadcastDelay() {
- return getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY);
- }
-
- public File getPersistentStateDirectory() {
- final String dirName = getProperty(PERSISTENT_STATE_DIRECTORY, DEFAULT_PERSISTENT_STATE_DIRECTORY);
- final File file = new File(dirName);
- if (!file.exists()) {
- file.mkdirs();
- }
- return file;
- }
-
- public int getClusterProtocolMulticastServiceLocatorAttempts() {
- try {
- return Integer.parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS));
- } catch (NumberFormatException nfe) {
- return DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS;
- }
- }
-
- public String getClusterProtocolMulticastServiceLocatorAttemptsDelay() {
- return getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY, DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY);
- }
-
- // getters for cluster node properties //
- public boolean isNode() {
- return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE));
- }
-
- public InetSocketAddress getClusterNodeProtocolAddress() {
- try {
- String socketAddress = getProperty(CLUSTER_NODE_ADDRESS);
- if (StringUtils.isBlank(socketAddress)) {
- socketAddress = "localhost";
- }
- int socketPort = getClusterNodeProtocolPort();
- return InetSocketAddress.createUnresolved(socketAddress, socketPort);
- } catch (Exception ex) {
- throw new RuntimeException("Invalid node protocol address/port due to: " + ex, ex);
- }
- }
-
- public Integer getClusterNodeProtocolPort() {
- try {
- return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_PORT));
- } catch (NumberFormatException nfe) {
- return null;
- }
- }
-
- public int getClusterNodeProtocolThreads() {
- try {
- return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_THREADS));
- } catch (NumberFormatException nfe) {
- return DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS;
- }
- }
-
- public InetSocketAddress getClusterNodeUnicastManagerProtocolAddress() {
- try {
- String socketAddress = getProperty(CLUSTER_NODE_UNICAST_MANAGER_ADDRESS);
- if (StringUtils.isBlank(socketAddress)) {
- socketAddress = "localhost";
- }
- int socketPort = Integer.parseInt(getProperty(CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT));
- return InetSocketAddress.createUnresolved(socketAddress, socketPort);
- } catch (Exception ex) {
- throw new RuntimeException("Invalid unicast manager address/port due to: " + ex, ex);
- }
- }
-
- // getters for cluster manager properties //
- public boolean isClusterManager() {
- return Boolean.parseBoolean(getProperty(CLUSTER_IS_MANAGER));
- }
-
- public InetSocketAddress getClusterManagerProtocolAddress() {
- try {
- String socketAddress = getProperty(CLUSTER_MANAGER_ADDRESS);
- if (StringUtils.isBlank(socketAddress)) {
- socketAddress = "localhost";
- }
- int socketPort = getClusterManagerProtocolPort();
- return InetSocketAddress.createUnresolved(socketAddress, socketPort);
- } catch (Exception ex) {
- throw new RuntimeException("Invalid manager protocol address/port due to: " + ex, ex);
- }
- }
-
- public Integer getClusterManagerProtocolPort() {
- try {
- return Integer.parseInt(getProperty(CLUSTER_MANAGER_PROTOCOL_PORT));
- } catch (NumberFormatException nfe) {
- return null;
- }
- }
-
- public File getClusterManagerNodeFirewallFile() {
- final String firewallFile = getProperty(CLUSTER_MANAGER_NODE_FIREWALL_FILE);
- if (StringUtils.isBlank(firewallFile)) {
- return null;
- } else {
- return new File(firewallFile);
- }
- }
-
- public int getClusterManagerNodeEventHistorySize() {
- try {
- return Integer.parseInt(getProperty(CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE));
- } catch (NumberFormatException nfe) {
- return DEFAULT_CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE;
- }
- }
-
- public String getClusterManagerNodeApiConnectionTimeout() {
- return getProperty(CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT, DEFAULT_CLUSTER_MANAGER_NODE_API_CONNECTION_TIMEOUT);
- }
-
- public String getClusterManagerNodeApiReadTimeout() {
- return getProperty(CLUSTER_MANAGER_NODE_API_READ_TIMEOUT, DEFAULT_CLUSTER_MANAGER_NODE_API_READ_TIMEOUT);
- }
-
- public int getClusterManagerNodeApiRequestThreads() {
- try {
- return Integer.parseInt(getProperty(CLUSTER_MANAGER_NODE_API_REQUEST_THREADS));
- } catch (NumberFormatException nfe) {
- return DEFAULT_CLUSTER_MANAGER_NODE_API_NUM_REQUEST_THREADS;
- }
- }
-
- public String getClusterManagerFlowRetrievalDelay() {
- return getProperty(CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY, DEFAULT_CLUSTER_MANAGER_FLOW_RETRIEVAL_DELAY);
- }
-
- public int getClusterManagerProtocolThreads() {
- try {
- return Integer.parseInt(getProperty(CLUSTER_MANAGER_PROTOCOL_THREADS));
- } catch (NumberFormatException nfe) {
- return DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS;
- }
- }
-
- public String getClusterManagerSafeModeDuration() {
- return getProperty(CLUSTER_MANAGER_SAFEMODE_DURATION, DEFAULT_CLUSTER_MANAGER_SAFEMODE_DURATION);
- }
-
- public String getClusterProtocolManagerToNodeApiScheme() {
- final String isSecureProperty = getProperty(CLUSTER_PROTOCOL_IS_SECURE);
- if (Boolean.valueOf(isSecureProperty)) {
- return "https";
- } else {
- return "http";
- }
- }
-
- public InetSocketAddress getNodeApiAddress() {
-
- final String rawScheme = getClusterProtocolManagerToNodeApiScheme();
- final String scheme = (rawScheme == null) ? "http" : rawScheme;
-
- final String host;
- final int port;
- if ("http".equalsIgnoreCase(scheme)) {
- // get host
- if (StringUtils.isBlank(getProperty(WEB_HTTP_HOST))) {
- host = "localhost";
- } else {
- host = getProperty(WEB_HTTP_HOST);
- }
- // get port
- port = getPort();
- } else {
- // get host
- if (StringUtils.isBlank(getProperty(WEB_HTTPS_HOST))) {
- host = "localhost";
- } else {
- host = getProperty(WEB_HTTPS_HOST);
- }
- // get port
- port = getSslPort();
- }
-
- return InetSocketAddress.createUnresolved(host, port);
-
- }
-
- /**
- * Returns the database repository path. It simply returns the value
- * configured. No directories will be created as a result of this operation.
- *
- * @return database repository path
- * @throws InvalidPathException If the configured path is invalid
- */
- public Path getDatabaseRepositoryPath() {
- return Paths.get(getProperty(REPOSITORY_DATABASE_DIRECTORY));
- }
-
- /**
- * Returns the flow file repository path. It simply returns the value
- * configured. No directories will be created as a result of this operation.
- *
- * @return database repository path
- * @throws InvalidPathException If the configured path is invalid
- */
- public Path getFlowFileRepositoryPath() {
- return Paths.get(getProperty(FLOWFILE_REPOSITORY_DIRECTORY));
- }
-
- /**
- * Returns the content repository paths. This method returns a mapping of
- * file repository name to file repository paths. It simply returns the
- * values configured. No directories will be created as a result of this
- * operation.
- *
- * @return file repositories paths
- * @throws InvalidPathException If any of the configured paths are invalid
- */
- public Map<String, Path> getContentRepositoryPaths() {
- final Map<String, Path> contentRepositoryPaths = new HashMap<>();
-
- // go through each property
- for (String propertyName : stringPropertyNames()) {
- // determine if the property is a file repository path
- if (StringUtils.startsWith(propertyName, REPOSITORY_CONTENT_PREFIX)) {
- // get the repository key
- final String key = StringUtils.substringAfter(propertyName, REPOSITORY_CONTENT_PREFIX);
-
- // attempt to resolve the path specified
- contentRepositoryPaths.put(key, Paths.get(getProperty(propertyName)));
- }
- }
- return contentRepositoryPaths;
- }
-
- /**
- * Returns the provenance repository paths. This method returns a mapping of
- * file repository name to file repository paths. It simply returns the
- * values configured. No directories will be created as a result of this
- * operation.
- *
- * @return
- */
- public Map<String, Path> getProvenanceRepositoryPaths() {
- final Map<String, Path> provenanceRepositoryPaths = new HashMap<>();
-
- // go through each property
- for (String propertyName : stringPropertyNames()) {
- // determine if the property is a file repository path
- if (StringUtils.startsWith(propertyName, PROVENANCE_REPO_DIRECTORY_PREFIX)) {
- // get the repository key
- final String key = StringUtils.substringAfter(propertyName, PROVENANCE_REPO_DIRECTORY_PREFIX);
-
- // attempt to resolve the path specified
- provenanceRepositoryPaths.put(key, Paths.get(getProperty(propertyName)));
- }
- }
- return provenanceRepositoryPaths;
- }
-
- public int getMaxFlowFilesPerClaim() {
- try {
- return Integer.parseInt(getProperty(MAX_FLOWFILES_PER_CLAIM));
- } catch (NumberFormatException nfe) {
- return DEFAULT_MAX_FLOWFILES_PER_CLAIM;
- }
- }
-
- public String getMaxAppendableClaimSize() {
- return getProperty(MAX_APPENDABLE_CLAIM_SIZE);
- }
-
- @Override
- public String getProperty(final String key, final String defaultValue) {
- final String value = super.getProperty(key, defaultValue);
- if (value == null) {
- return null;
- }
-
- if (value.trim().isEmpty()) {
- return defaultValue;
- }
- return value;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java b/nifi/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java
deleted file mode 100644
index aa6f8f3..0000000
--- a/nifi/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-/**
- * String Utils based on the Apache Commons Lang String Utils.
- * These simple util methods here allow us to avoid a dependency in the core
- */
-public class StringUtils {
-
- public static final String EMPTY = "";
-
- public static boolean isBlank(final String str) {
- if (str == null || str.isEmpty()) {
- return true;
- }
- for (int i = 0; i < str.length(); i++) {
- if (!Character.isWhitespace(str.charAt(i))) {
- return false;
- }
- }
- return true;
- }
-
- public static boolean isEmpty(final String str) {
- return str == null || str.isEmpty();
- }
-
- public static boolean startsWith(final String str, final String prefix) {
- if (str == null || prefix == null) {
- return (str == null && prefix == null);
- }
- if (prefix.length() > str.length()) {
- return false;
- }
- return str.regionMatches(false, 0, prefix, 0, prefix.length());
- }
-
- public static String substringAfter(final String str, final String separator) {
- if (isEmpty(str)) {
- return str;
- }
- if (separator == null) {
- return EMPTY;
- }
- int pos = str.indexOf(separator);
- if (pos == -1) {
- return EMPTY;
- }
- return str.substring(pos + separator.length());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-security-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-security-utils/pom.xml b/nifi/commons/nifi-security-utils/pom.xml
deleted file mode 100644
index 8d323e4..0000000
--- a/nifi/commons/nifi-security-utils/pom.xml
+++ /dev/null
@@ -1,40 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-commons-parent</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- </parent>
-
- <artifactId>nifi-security-utils</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- <name>NiFi Security Utils</name>
- <description>Contains security functionality.</description>
-
- <dependencies>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
- </dependencies>
-</project>
-
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java b/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java
deleted file mode 100644
index 087d891..0000000
--- a/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.security.util;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.security.KeyStore;
-import java.security.cert.CertificateParsingException;
-import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class CertificateUtils {
-
- private static final Logger logger = LoggerFactory.getLogger(CertificateUtils.class);
-
- /**
- * Returns true if the given keystore can be loaded using the given keystore
- * type and password. Returns false otherwise.
- * @param keystore
- * @param keystoreType
- * @param password
- * @return
- */
- public static boolean isStoreValid(final URL keystore, final KeystoreType keystoreType, final char[] password) {
-
- if (keystore == null) {
- throw new IllegalArgumentException("keystore may not be null");
- } else if (keystoreType == null) {
- throw new IllegalArgumentException("keystore type may not be null");
- } else if (password == null) {
- throw new IllegalArgumentException("password may not be null");
- }
-
- BufferedInputStream bis = null;
- final KeyStore ks;
- try {
-
- // load the keystore
- bis = new BufferedInputStream(keystore.openStream());
- ks = KeyStore.getInstance(keystoreType.name());
- ks.load(bis, password);
-
- return true;
-
- } catch (Exception e) {
- return false;
- } finally {
- if (bis != null) {
- try {
- bis.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close input stream", ioe);
- }
- }
- }
- }
-
- /**
- * Extracts the username from the specified DN. If the username cannot be
- * extracted because the CN is in an unrecognized format, the entire CN is
- * returned. If the CN cannot be extracted because the DN is in an
- * unrecognized format, the entire DN is returned.
- *
- * @param dn
- * @return
- */
- public static String extractUsername(String dn) {
- String username = dn;
- String cn = "";
-
- // ensure the dn is specified
- if (StringUtils.isNotBlank(dn)) {
-
- // attempt to locate the cn
- if (dn.startsWith("CN=")) {
- cn = StringUtils.substringBetween(dn, "CN=", ",");
- } else if (dn.startsWith("/CN=")) {
- cn = StringUtils.substringBetween(dn, "CN=", "/");
- } else if (dn.startsWith("C=") || dn.startsWith("/C=")) {
- cn = StringUtils.substringAfter(dn, "CN=");
- } else if (dn.startsWith("/") && StringUtils.contains(dn, "CN=")) {
- cn = StringUtils.substringAfter(dn, "CN=");
- }
-
- // attempt to get the username from the cn
- if (StringUtils.isNotBlank(cn)) {
- if (cn.endsWith(")")) {
- username = StringUtils.substringBetween(cn, "(", ")");
- } else if (cn.contains(" ")) {
- username = StringUtils.substringAfterLast(cn, " ");
- } else {
- username = cn;
- }
- }
- }
-
- return username;
- }
-
- /**
- * Returns a list of subject alternative names. Any name that is represented
- * as a String by X509Certificate.getSubjectAlternativeNames() is converted
- * to lowercase and returned.
- *
- * @param certificate a certificate
- * @return a list of subject alternative names; list is never null
- * @throws CertificateParsingException if parsing the certificate failed
- */
- public static List<String> getSubjectAlternativeNames(final X509Certificate certificate) throws CertificateParsingException {
-
- final Collection<List<?>> altNames = certificate.getSubjectAlternativeNames();
- if (altNames == null) {
- return new ArrayList<>();
- }
-
- final List<String> result = new ArrayList<>();
- for (final List<?> generalName : altNames) {
- /*
- * generalName has the name type as the first element a String or
- * byte array for the second element. We return any general names
- * that are String types.
- *
- * We don't inspect the numeric name type because some certificates
- * incorrectly put IPs and DNS names under the wrong name types.
- */
- final Object value = generalName.get(1);
- if (value instanceof String) {
- result.add(((String) value).toLowerCase());
- }
-
- }
-
- return result;
- }
-
- private CertificateUtils() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java b/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java
deleted file mode 100644
index 741fdde..0000000
--- a/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.security.util;
-
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-
-/**
- * Enumeration capturing essential information about the various encryption
- * methods that might be supported.
- *
- * @author none
- */
-public enum EncryptionMethod {
-
- MD5_128AES("PBEWITHMD5AND128BITAES-CBC-OPENSSL", "BC", false),
- MD5_256AES("PBEWITHMD5AND256BITAES-CBC-OPENSSL", "BC", false),
- SHA1_RC2("PBEWITHSHA1ANDRC2", "BC", false),
- SHA1_DES("PBEWITHSHA1ANDDES", "BC", false),
- MD5_192AES("PBEWITHMD5AND192BITAES-CBC-OPENSSL", "BC", false),
- MD5_DES("PBEWITHMD5ANDDES", "BC", false),
- MD5_RC2("PBEWITHMD5ANDRC2", "BC", false),
- SHA_192AES("PBEWITHSHAAND192BITAES-CBC-BC", "BC", true),
- SHA_40RC4("PBEWITHSHAAND40BITRC4", "BC", true),
- SHA256_128AES("PBEWITHSHA256AND128BITAES-CBC-BC", "BC", true),
- SHA_128RC2("PBEWITHSHAAND128BITRC2-CBC", "BC", true),
- SHA_128AES("PBEWITHSHAAND128BITAES-CBC-BC", "BC", true),
- SHA256_192AES("PBEWITHSHA256AND192BITAES-CBC-BC", "BC", true),
- SHA_2KEYTRIPLEDES("PBEWITHSHAAND2-KEYTRIPLEDES-CBC", "BC", true),
- SHA256_256AES("PBEWITHSHA256AND256BITAES-CBC-BC", "BC", true),
- SHA_40RC2("PBEWITHSHAAND40BITRC2-CBC", "BC", true),
- SHA_256AES("PBEWITHSHAAND256BITAES-CBC-BC", "BC", true),
- SHA_3KEYTRIPLEDES("PBEWITHSHAAND3-KEYTRIPLEDES-CBC", "BC", true),
- SHA_TWOFISH("PBEWITHSHAANDTWOFISH-CBC", "BC", true),
- SHA_128RC4("PBEWITHSHAAND128BITRC4", "BC", true);
- private final String algorithm;
- private final String provider;
- private final boolean unlimitedStrength;
-
- EncryptionMethod(String algorithm, String provider, boolean unlimitedStrength) {
- this.algorithm = algorithm;
- this.provider = provider;
- this.unlimitedStrength = unlimitedStrength;
- }
-
- public String getProvider() {
- return provider;
- }
-
- public String getAlgorithm() {
- return algorithm;
- }
-
- /**
- * @return true if algorithm requires unlimited strength policies
- */
- public boolean isUnlimitedStrength() {
- return unlimitedStrength;
- }
-
- @Override
- public String toString() {
- final ToStringBuilder builder = new ToStringBuilder(this);
- ToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE);
- builder.append("algorithm name", algorithm);
- builder.append("Requires unlimited strength JCE policy", unlimitedStrength);
- builder.append("Algorithm Provider", provider);
- return builder.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java b/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java
deleted file mode 100644
index 18574bb..0000000
--- a/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.security.util;
-
-/**
- * Keystore types.
- */
-public enum KeystoreType {
-
- PKCS12,
- JKS;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java b/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java
deleted file mode 100644
index 9abfcc3..0000000
--- a/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.security.util;
-
-import java.io.PrintWriter;
-import java.io.Writer;
-
-/**
- * Types of security stores and their related Java system properties.
- */
-public enum SecurityStoreTypes {
-
- TRUSTSTORE(
- "javax.net.ssl.trustStore",
- "javax.net.ssl.trustStorePassword",
- "javax.net.ssl.trustStoreType"),
- KEYSTORE(
- "javax.net.ssl.keyStore",
- "javax.net.ssl.keyStorePassword",
- "javax.net.ssl.keyStoreType");
-
- /**
- * Logs the keystore and truststore Java system property values to the given
- * writer. It logPasswords is true, then the keystore and truststore
- * password property values are logged.
- *
- * @param writer a writer to log to
- *
- * @param logPasswords true if passwords should be logged; false otherwise
- */
- public static void logProperties(final Writer writer,
- final boolean logPasswords) {
- if (writer == null) {
- return;
- }
-
- PrintWriter pw = new PrintWriter(writer);
-
- // keystore properties
- pw.println(
- KEYSTORE.getStoreProperty() + " = " + System.getProperty(KEYSTORE.getStoreProperty()));
-
- if (logPasswords) {
- pw.println(
- KEYSTORE.getStorePasswordProperty() + " = "
- + System.getProperty(KEYSTORE.getStoreProperty()));
- }
-
- pw.println(
- KEYSTORE.getStoreTypeProperty() + " = "
- + System.getProperty(KEYSTORE.getStoreTypeProperty()));
-
- // truststore properties
- pw.println(
- TRUSTSTORE.getStoreProperty() + " = "
- + System.getProperty(TRUSTSTORE.getStoreProperty()));
-
- if (logPasswords) {
- pw.println(
- TRUSTSTORE.getStorePasswordProperty() + " = "
- + System.getProperty(TRUSTSTORE.getStoreProperty()));
- }
-
- pw.println(
- TRUSTSTORE.getStoreTypeProperty() + " = "
- + System.getProperty(TRUSTSTORE.getStoreTypeProperty()));
- pw.flush();
- }
-
- /**
- * the Java system property for setting the keystore (or truststore) path
- */
- private String storeProperty = "";
-
- /**
- * the Java system property for setting the keystore (or truststore)
- * password
- */
- private String storePasswordProperty = "";
-
- /**
- * the Java system property for setting the keystore (or truststore) type
- */
- private String storeTypeProperty = "";
-
- /**
- * Creates an instance.
- *
- * @param storeProperty the Java system property for setting the keystore (
- * or truststore) path
- * @param storePasswordProperty the Java system property for setting the
- * keystore (or truststore) password
- * @param storeTypeProperty the Java system property for setting the
- * keystore (or truststore) type
- */
- SecurityStoreTypes(final String storeProperty,
- final String storePasswordProperty,
- final String storeTypeProperty) {
- this.storeProperty = storeProperty;
- this.storePasswordProperty = storePasswordProperty;
- this.storeTypeProperty = storeTypeProperty;
- }
-
- /**
- * Returns the keystore (or truststore) property.
- *
- * @return the keystore (or truststore) property
- */
- public String getStoreProperty() {
- return storeProperty;
- }
-
- /**
- * Returns the keystore (or truststore) password property.
- *
- * @return the keystore (or truststore) password property
- */
- public String getStorePasswordProperty() {
- return storePasswordProperty;
- }
-
- /**
- * Returns the keystore (or truststore) type property.
- *
- * @return the keystore (or truststore) type property
- */
- public String getStoreTypeProperty() {
- return storeTypeProperty;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java b/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java
deleted file mode 100644
index 2371b0c..0000000
--- a/nifi/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.security.util;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-
-/**
- * A factory for creating SSL contexts using the application's security
- * properties.
- *
- * @author unattributed
- */
-public final class SslContextFactory {
-
- public static enum ClientAuth {
-
- WANT,
- REQUIRED,
- NONE
- }
-
- /**
- * Creates a SSLContext instance using the given information.
- *
- * @param keystore the full path to the keystore
- * @param keystorePasswd the keystore password
- * @param keystoreType the type of keystore (e.g., PKCS12, JKS)
- * @param truststore the full path to the truststore
- * @param truststorePasswd the truststore password
- * @param truststoreType the type of truststore (e.g., PKCS12, JKS)
- * @param clientAuth the type of client authentication
- *
- * @return a SSLContext instance
- * @throws java.security.KeyStoreException
- * @throws java.io.IOException
- * @throws java.security.NoSuchAlgorithmException
- * @throws java.security.cert.CertificateException
- * @throws java.security.UnrecoverableKeyException
- * @throws java.security.KeyManagementException
- */
- public static SSLContext createSslContext(
- final String keystore, final char[] keystorePasswd, final String keystoreType,
- final String truststore, final char[] truststorePasswd, final String truststoreType,
- final ClientAuth clientAuth)
- throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
- UnrecoverableKeyException, KeyManagementException {
-
- // prepare the keystore
- final KeyStore keyStore = KeyStore.getInstance(keystoreType);
- try (final InputStream keyStoreStream = new FileInputStream(keystore)) {
- keyStore.load(keyStoreStream, keystorePasswd);
- }
- final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- keyManagerFactory.init(keyStore, keystorePasswd);
-
- // prepare the truststore
- final KeyStore trustStore = KeyStore.getInstance(truststoreType);
- try (final InputStream trustStoreStream = new FileInputStream(truststore)) {
- trustStore.load(trustStoreStream, truststorePasswd);
- }
- final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- trustManagerFactory.init(trustStore);
-
- // initialize the ssl context
- final SSLContext sslContext = SSLContext.getInstance("TLS");
- sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom());
- if (ClientAuth.REQUIRED == clientAuth) {
- sslContext.getDefaultSSLParameters().setNeedClientAuth(true);
- } else if (ClientAuth.WANT == clientAuth) {
- sslContext.getDefaultSSLParameters().setWantClientAuth(true);
- } else {
- sslContext.getDefaultSSLParameters().setWantClientAuth(false);
- }
-
- return sslContext;
-
- }
-
- /**
- * Creates a SSLContext instance using the given information.
- *
- * @param keystore the full path to the keystore
- * @param keystorePasswd the keystore password
- * @param keystoreType the type of keystore (e.g., PKCS12, JKS)
- *
- * @return a SSLContext instance
- * @throws java.security.KeyStoreException
- * @throws java.io.IOException
- * @throws java.security.NoSuchAlgorithmException
- * @throws java.security.cert.CertificateException
- * @throws java.security.UnrecoverableKeyException
- * @throws java.security.KeyManagementException
- */
- public static SSLContext createSslContext(
- final String keystore, final char[] keystorePasswd, final String keystoreType)
- throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
- UnrecoverableKeyException, KeyManagementException {
-
- // prepare the keystore
- final KeyStore keyStore = KeyStore.getInstance(keystoreType);
- try (final InputStream keyStoreStream = new FileInputStream(keystore)) {
- keyStore.load(keyStoreStream, keystorePasswd);
- }
- final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- keyManagerFactory.init(keyStore, keystorePasswd);
-
- // initialize the ssl context
- final SSLContext ctx = SSLContext.getInstance("TLS");
- ctx.init(keyManagerFactory.getKeyManagers(), new TrustManager[0], new SecureRandom());
-
- return ctx;
-
- }
-
- /**
- * Creates a SSLContext instance using the given information.
- *
- * @param truststore the full path to the truststore
- * @param truststorePasswd the truststore password
- * @param truststoreType the type of truststore (e.g., PKCS12, JKS)
- *
- * @return a SSLContext instance
- * @throws java.security.KeyStoreException
- * @throws java.io.IOException
- * @throws java.security.NoSuchAlgorithmException
- * @throws java.security.cert.CertificateException
- * @throws java.security.UnrecoverableKeyException
- * @throws java.security.KeyManagementException
- */
- public static SSLContext createTrustSslContext(
- final String truststore, final char[] truststorePasswd, final String truststoreType)
- throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
- UnrecoverableKeyException, KeyManagementException {
-
- // prepare the truststore
- final KeyStore trustStore = KeyStore.getInstance(truststoreType);
- try (final InputStream trustStoreStream = new FileInputStream(truststore)) {
- trustStore.load(trustStoreStream, truststorePasswd);
- }
- final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- trustManagerFactory.init(trustStore);
-
- // initialize the ssl context
- final SSLContext ctx = SSLContext.getInstance("TLS");
- ctx.init(new KeyManager[0], trustManagerFactory.getTrustManagers(), new SecureRandom());
-
- return ctx;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/pom.xml b/nifi/commons/nifi-socket-utils/pom.xml
deleted file mode 100644
index cc2a2b6..0000000
--- a/nifi/commons/nifi-socket-utils/pom.xml
+++ /dev/null
@@ -1,60 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-commons-parent</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- </parent>
-
- <artifactId>nifi-socket-utils</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- <name>NiFi Socket Utils</name>
- <description>Utilities for socket communication</description>
-
- <dependencies>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-logging-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-net</groupId>
- <artifactId>commons-net</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-properties</artifactId>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
deleted file mode 100644
index 172c593..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.nifi.io.nio.consumer.StreamConsumer;
-import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
-
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * @author none
- */
-public abstract class AbstractChannelReader implements Runnable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChannelReader.class);
- private final String uniqueId;
- private final SelectionKey key;
- private final BufferPool bufferPool;
- private final StreamConsumer consumer;
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
- private final AtomicReference<ScheduledFuture<?>> future = new AtomicReference<>(null);//the future on which this reader runs...
-
- public AbstractChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
- this.uniqueId = id;
- this.key = key;
- this.bufferPool = empties;
- this.consumer = consumerFactory.newInstance(id);
- consumer.setReturnBufferQueue(bufferPool);
- }
-
- protected void setScheduledFuture(final ScheduledFuture<?> future) {
- this.future.set(future);
- }
-
- protected ScheduledFuture<?> getScheduledFuture() {
- return future.get();
- }
-
- protected SelectionKey getSelectionKey() {
- return key;
- }
-
- public boolean isClosed() {
- return isClosed.get();
- }
-
- private void closeStream() {
- if (isClosed.get()) {
- return;
- }
- try {
- isClosed.set(true);
- future.get().cancel(false);
- key.cancel();
- key.channel().close();
- } catch (final IOException ioe) {
- LOGGER.warn("Unable to cleanly close stream due to " + ioe);
- } finally {
- consumer.signalEndOfStream();
- }
- }
-
- /**
- * Allows a subclass to specifically handle how it reads from the given
- * key's channel into the given buffer.
- *
- * @param key
- * @param buffer
- * @return the number of bytes read in the final read cycle. A value of zero
- * or more indicates the channel is still open but a value of -1 indicates
- * end of stream.
- * @throws IOException
- */
- protected abstract int fillBuffer(SelectionKey key, ByteBuffer buffer) throws IOException;
-
- @Override
- public final void run() {
- if (!key.isValid() || consumer.isConsumerFinished()) {
- closeStream();
- return;
- }
- if (!key.isReadable()) {
- return;//there is nothing available to read...or we aren't allow to read due to throttling
- }
- ByteBuffer buffer = null;
- try {
- buffer = bufferPool.poll();
- if (buffer == null) {
- return; // no buffers available - come back later
- }
- final int bytesRead = fillBuffer(key, buffer);
- buffer.flip();
- if (buffer.remaining() > 0) {
- consumer.addFilledBuffer(buffer);
- buffer = null; //clear the reference - is now the consumer's responsiblity
- } else {
- buffer.clear();
- bufferPool.returnBuffer(buffer, 0);
- buffer = null; //clear the reference - is now back to the queue
- }
- if (bytesRead < 0) { //we've reached the end
- closeStream();
- }
- } catch (final Exception ioe) {
- closeStream();
- LOGGER.error("Closed channel reader " + this + " due to " + ioe);
- } finally {
- if (buffer != null) {
- buffer.clear();
- bufferPool.returnBuffer(buffer, 0);
- }
- }
- }
-
- @Override
- public final boolean equals(final Object obj) {
- if (obj == null) {
- return false;
- }
- if (obj == this) {
- return true;
- }
- if (obj.getClass() != getClass()) {
- return false;
- }
- AbstractChannelReader rhs = (AbstractChannelReader) obj;
- return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals();
- }
-
- @Override
- public final int hashCode() {
- return new HashCodeBuilder(17, 37).append(uniqueId).toHashCode();
- }
-
- @Override
- public final String toString() {
- return new ToStringBuilder(this, ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java b/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
deleted file mode 100644
index a413ad2..0000000
--- a/nifi/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * @author none
- */
-public class BufferPool implements Runnable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(BufferPool.class);
- final BlockingQueue<ByteBuffer> bufferPool;
- private final static double ONE_MB = 1 << 20;
- private Calendar lastRateSampleTime = Calendar.getInstance();
- private final Calendar startTime = Calendar.getInstance();
- double lastRateSampleMBps = -1.0;
- double overallMBps = -1.0;
- private long totalBytesExtracted = 0L;
- private long lastTotalBytesExtracted = 0L;
- final double maxRateMBps;
-
- public BufferPool(final int bufferCount, final int bufferCapacity, final boolean allocateDirect, final double maxRateMBps) {
- bufferPool = new LinkedBlockingDeque<>(BufferPool.createBuffers(bufferCount, bufferCapacity, allocateDirect));
- this.maxRateMBps = maxRateMBps;
- }
-
- /**
- * Returns the given buffer to the pool - and clears it.
- *
- * @param buffer
- * @param bytesProcessed
- * @return
- */
- public synchronized boolean returnBuffer(ByteBuffer buffer, final int bytesProcessed) {
- totalBytesExtracted += bytesProcessed;
- buffer.clear();
- return bufferPool.add(buffer);
- }
-
- //here we enforce the desired rate we want by restricting access to buffers when we're over rate
- public synchronized ByteBuffer poll() {
- computeRate();
- final double weightedAvg = (lastRateSampleMBps * 0.7) + (overallMBps * 0.3);
- if (overallMBps >= maxRateMBps || weightedAvg >= maxRateMBps) {
- return null;
- }
- return bufferPool.poll();
- }
-
- public int size() {
- return bufferPool.size();
- }
-
- private synchronized void computeRate() {
- final Calendar now = Calendar.getInstance();
- final long measurementDurationMillis = now.getTimeInMillis() - lastRateSampleTime.getTimeInMillis();
- final double duractionSecs = ((double) measurementDurationMillis) / 1000.0;
- if (duractionSecs >= 0.75) { //recompute every 3/4 second or when we're too fast
- final long totalDuractionMillis = now.getTimeInMillis() - startTime.getTimeInMillis();
- final double totalDurationSecs = ((double) totalDuractionMillis) / 1000.0;
- final long differenceBytes = totalBytesExtracted - lastTotalBytesExtracted;
- lastTotalBytesExtracted = totalBytesExtracted;
- lastRateSampleTime = now;
- final double bps = ((double) differenceBytes) / duractionSecs;
- final double totalBps = ((double) totalBytesExtracted / totalDurationSecs);
- lastRateSampleMBps = bps / ONE_MB;
- overallMBps = totalBps / ONE_MB;
- }
- }
-
- public static List<ByteBuffer> createBuffers(final int bufferCount, final int bufferCapacity, final boolean allocateDirect) {
- final List<ByteBuffer> buffers = new ArrayList<>();
- for (int i = 0; i < bufferCount; i++) {
- final ByteBuffer buffer = (allocateDirect) ? ByteBuffer.allocateDirect(bufferCapacity) : ByteBuffer.allocate(bufferCapacity);
- buffers.add(buffer);
- }
- return buffers;
- }
-
- private void logChannelReadRates() {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format("Overall rate= %,.4f MB/s / Current Rate= %,.4f MB/s / Total Bytes Read= %d", overallMBps, lastRateSampleMBps, totalBytesExtracted));
- }
- }
-
- @Override
- public void run() {
- computeRate();
- logChannelReadRates();
- }
-}