You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:24:44 UTC
[03/42] incubator-streams git commit: STREAMS-440: custom
checkstyle.xml, address compliance
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-storm/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/pom.xml b/streams-runtimes/streams-runtime-storm/pom.xml
deleted file mode 100644
index ded3efc..0000000
--- a/streams-runtimes/streams-runtime-storm/pom.xml
+++ /dev/null
@@ -1,124 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>streams-runtimes</artifactId>
- <groupId>org.apache.streams</groupId>
- <version>0.4-incubating-SNAPSHOT</version>
- </parent>
- <artifactId>streams-runtime-storm</artifactId>
- <name>${project.artifactId}</name>
- <description>Apache Streams Runtimes</description>
-
- <properties>
- <storm.version>0.9.1-incubating</storm.version>
- <scala.version>2.9.2</scala.version>
- <zkclient.version>0.4</zkclient.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-config</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-util</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-collections4</artifactId>
- <version>4.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${storm.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- <scope>compile</scope>
- <type>jar</type>
- </dependency>
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>${zkclient.version}</version>
- <scope>compile</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-testing</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- </dependencies>
-
- <build>
- <sourceDirectory>src/main/java</sourceDirectory>
- <testSourceDirectory>src/test/java</testSourceDirectory>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- <testResources>
- <testResource>
- <directory>src/test/resources</directory>
- </testResource>
- </testResources>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java
----------------------------------------------------------------------
diff --git a/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java b/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java
index 8f22450..6344c3c 100644
--- a/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java
+++ b/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java
@@ -40,47 +40,50 @@ import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
+/**
+ * Test validity of documents vs schemas.
+ */
public class SchemaValidationTest {
- private final static Logger LOGGER = LoggerFactory.getLogger(SchemaValidationTest.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SchemaValidationTest.class);
- private final static ObjectMapper MAPPER = new ObjectMapper();
+ private static final ObjectMapper MAPPER = new ObjectMapper();
- /**
- * Tests that activities matching core-ex* can be parsed by apache streams
- *
- * @throws Exception
- */
- @Test
- public void validateToSchema() throws Exception {
+ /**
+ * Tests that activities matching core-ex* can be parsed by apache streams.
+ *
+ * @throws Exception Test Exception
+ */
+ @Test
+ public void testValidateToSchema() throws Exception {
- JsonSchemaFactory factory = new JsonSchemaFactory();
+ JsonSchemaFactory factory = new JsonSchemaFactory();
- InputStream testActivityFolderStream = SchemaValidationTest.class.getClassLoader()
- .getResourceAsStream("activities");
- List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+ InputStream testActivityFolderStream = SchemaValidationTest.class.getClassLoader()
+ .getResourceAsStream("activities");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
- for (String file : files) {
- if( !file.startsWith(".") ) {
+ for (String file : files) {
+ if ( !file.startsWith(".") ) {
- LOGGER.info("Test File: activities/" + file);
- String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/activities/" + file)));
- LOGGER.info("Test Document JSON: " + testFileString);
- JsonNode testNode = MAPPER.readValue(testFileString, ObjectNode.class);
- LOGGER.info("Test Document Object:" + testNode);
- LOGGER.info("Test Schema File: " + "target/classes/verbs/" + file);
- String testSchemaString = new String(Files.readAllBytes(Paths.get("target/classes/verbs/" + file)));
- LOGGER.info("Test Schema JSON: " + testSchemaString);
- JsonNode testSchemaNode = MAPPER.readValue(testFileString, ObjectNode.class);
- LOGGER.info("Test Schema Object:" + testSchemaNode);
- JsonSchema testSchema = factory.getSchema(testSchemaNode);
- LOGGER.info("Test Schema:" + testSchema);
+ LOGGER.info("Test File: activities/" + file);
+ String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/activities/" + file)));
+ LOGGER.info("Test Document JSON: " + testFileString);
+ JsonNode testNode = MAPPER.readValue(testFileString, ObjectNode.class);
+ LOGGER.info("Test Document Object:" + testNode);
+ LOGGER.info("Test Schema File: " + "target/classes/verbs/" + file);
+ String testSchemaString = new String(Files.readAllBytes(Paths.get("target/classes/verbs/" + file)));
+ LOGGER.info("Test Schema JSON: " + testSchemaString);
+ JsonNode testSchemaNode = MAPPER.readValue(testFileString, ObjectNode.class);
+ LOGGER.info("Test Schema Object:" + testSchemaNode);
+ JsonSchema testSchema = factory.getSchema(testSchemaNode);
+ LOGGER.info("Test Schema:" + testSchema);
- Set<ValidationMessage> errors = testSchema.validate(testNode);
- assertThat(errors.size(), is(0));
+ Set<ValidationMessage> errors = testSchema.validate(testNode);
+ assertThat(errors.size(), is(0));
- }
- }
+ }
}
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java b/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java
index b1b5824..8500efd 100644
--- a/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java
+++ b/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java
@@ -33,103 +33,106 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
+/**
+ * Tests that activities matching core-ex* can be parsed by apache streams.
+ */
public class ExamplesSerDeIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(ExamplesSerDeIT.class);
-
- private final static ObjectMapper MAPPER = new ObjectMapper();
-
- /**
- * Tests that activities matching core-ex* can be parsed by apache streams
- *
- * @throws Exception
- */
- @Test
- public void testCoreSerDe() throws Exception {
-
- InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
- .getResourceAsStream("w3c/activitystreams-master/test");
- List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
-
- for (String file : files) {
- if( !file.startsWith(".") && file.contains("core-ex") ) {
- LOGGER.info("File: activitystreams-master/test/" + file);
- String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
- LOGGER.info("Content: " + testFileString);
- ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
- LOGGER.info("Object:" + testFileObjectNode);
- }
- }
+ private static final Logger LOGGER = LoggerFactory.getLogger(ExamplesSerDeIT.class);
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ /**
+ * Tests that activities matching core-ex* can be parsed by apache streams.
+ *
+ * @throws Exception test exception
+ */
+ @Test
+ public void testCoreSerDe() throws Exception {
+
+ InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
+ .getResourceAsStream("w3c/activitystreams-master/test");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+ for (String file : files) {
+ if ( !file.startsWith(".") && file.contains("core-ex") ) {
+ LOGGER.info("File: activitystreams-master/test/" + file);
+ String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
+ LOGGER.info("Content: " + testFileString);
+ ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
+ LOGGER.info("Object:" + testFileObjectNode);
+ }
}
-
- /**
- * Tests that activities matching simple* can be parsed by apache streams
- *
- * @throws Exception
- */
- @Test
- public void testSimpleSerDe() throws Exception {
-
- InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
- .getResourceAsStream("w3c/activitystreams-master/test");
- List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
-
- for (String file : files) {
- if( !file.startsWith(".") && file.contains("simple") ) {
- LOGGER.info("File: activitystreams-master/test/" + file);
- String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
- LOGGER.info("Content: " + testFileString);
- ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
- LOGGER.info("Object:" + testFileObjectNode);
- }
- }
+ }
+
+ /**
+ * Tests that activities matching simple* can be parsed by apache streams.
+ *
+ * @throws Exception test exception
+ */
+ @Test
+ public void testSimpleSerDe() throws Exception {
+
+ InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
+ .getResourceAsStream("w3c/activitystreams-master/test");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+ for (String file : files) {
+ if ( !file.startsWith(".") && file.contains("simple") ) {
+ LOGGER.info("File: activitystreams-master/test/" + file);
+ String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
+ LOGGER.info("Content: " + testFileString);
+ ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
+ LOGGER.info("Object:" + testFileObjectNode);
+ }
}
-
- /**
- * Tests that activities matching vocabulary-ex* can be parsed by apache streams
- *
- * @throws Exception
- */
- @Ignore
- @Test
- public void testVocabularySerDe() throws Exception {
-
- InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
- .getResourceAsStream("w3c/activitystreams-master/test");
- List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
-
- for (String file : files) {
- if( !file.startsWith(".") && file.contains("vocabulary-ex") ) {
- LOGGER.info("File: activitystreams-master/test/" + file);
- String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
- LOGGER.info("Content: " + testFileString);
- ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
- LOGGER.info("Object:" + testFileObjectNode);
- }
- }
+ }
+
+ /**
+ * Tests that activities matching vocabulary-ex* can be parsed by apache streams.
+ *
+ * @throws Exception test exception
+ */
+ @Ignore
+ @Test
+ public void testVocabularySerDe() throws Exception {
+
+ InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
+ .getResourceAsStream("w3c/activitystreams-master/test");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+ for (String file : files) {
+ if ( !file.startsWith(".") && file.contains("vocabulary-ex") ) {
+ LOGGER.info("File: activitystreams-master/test/" + file);
+ String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
+ LOGGER.info("Content: " + testFileString);
+ ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
+ LOGGER.info("Object:" + testFileObjectNode);
+ }
}
-
- /**
- * Tests that activities expect to fail cannot be parsed by apache streams
- *
- * @throws Exception
- */
- @Ignore
- @Test
- public void testFailSerDe() throws Exception {
-
- InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
- .getResourceAsStream("w3c/activitystreams-master/test/fail");
- List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
-
- for (String file : files) {
- if( !file.startsWith(".") && file.contains("vocabulary-ex") ) {
- LOGGER.info("File: activitystreams-master/test/fail/" + file);
- String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
- LOGGER.info("Content: " + testFileString);
- ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
- LOGGER.info("Object:" + testFileObjectNode);
- }
- }
+ }
+
+ /**
+ * Tests that activities expect to fail cannot be parsed by apache streams.
+ *
+ * @throws Exception test exception
+ */
+ @Ignore
+ @Test
+ public void testFailSerDe() throws Exception {
+
+ InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
+ .getResourceAsStream("w3c/activitystreams-master/test/fail");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+ for (String file : files) {
+ if ( !file.startsWith(".") && file.contains("vocabulary-ex") ) {
+ LOGGER.info("File: activitystreams-master/test/fail/" + file);
+ String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
+ LOGGER.info("Content: " + testFileString);
+ ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
+ LOGGER.info("Object:" + testFileObjectNode);
+ }
}
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index 6037f28..514c851 100644
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -15,126 +15,116 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.util;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.*;
import java.lang.management.ManagementFactory;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
/**
* Common utilities for Streams components.
*/
public class ComponentUtils {
- private static final Logger LOGGER = LoggerFactory.getLogger(ComponentUtils.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ComponentUtils.class);
- /**
- * Certain types of queues will fail to {@link java.util.Queue#offer(Object)} an item due to many factors
- * depending on the type of queue. <code>offerUntilSuccess</code> will not return until the item has been
- * successfully queued onto the desired queue
- * @param entry item to queue
- * @param queue queue to add the entry to
- * @param <T>
- */
- public static <T> void offerUntilSuccess(T entry, Queue<T> queue) {
- boolean success;
- do {
- success = queue.offer(entry);
- Thread.yield();
- }
- while( !success );
+ /**
+ * Certain types of queues will fail to {@link java.util.Queue#offer(Object)} an item due to many factors
+ * depending on the type of queue. <code>offerUntilSuccess</code> will not return until the item has been
+ * successfully queued onto the desired queue
+ * @param entry item to queue
+ * @param queue queue to add the entry to
+ * @param <T> type
+ */
+ public static <T> void offerUntilSuccess(T entry, Queue<T> queue) {
+ boolean success;
+ do {
+ success = queue.offer(entry);
+ Thread.yield();
}
+ while ( !success );
+ }
- /**
- * Certain types of queues will return null when calling {@link java.util.Queue#poll()} due to many factors depending
- * on the type of queue. <code>pollWhileNotEmpty</code> will poll the queue until an item from the queue is returned
- * or the queue is empty. If the queue is empty it will return NULL.
- * @param queue
- * @param <T>
- * @return
- */
- public static <T> T pollWhileNotEmpty(Queue<T> queue) {
- T item = queue.poll();
- while(!queue.isEmpty() && item == null) {
- Thread.yield();
- item = queue.poll();
- }
- return item;
+ /**
+ * Certain types of queues will return null when calling {@link java.util.Queue#poll()} due to many factors depending
+ * on the type of queue. <code>pollWhileNotEmpty</code> will poll the queue until an item from the queue is returned
+ * or the queue is empty. If the queue is empty it will return NULL.
+ * @param queue queue to read the entry from
+ * @param <T> type
+ * @return result
+ */
+ public static <T> T pollWhileNotEmpty(Queue<T> queue) {
+ T item = queue.poll();
+ while (!queue.isEmpty() && item == null) {
+ Thread.yield();
+ item = queue.poll();
}
+ return item;
+ }
-
- public static String pollUntilStringNotEmpty(Queue queue) {
-
- String result = null;
- do {
- synchronized( ComponentUtils.class ) {
- try {
- result = (String) queue.remove();
- } catch( Exception e ) {}
- }
- Thread.yield();
+ /**
+ * Attempts to safely {@link java.util.concurrent.ExecutorService#shutdown()}
+ * and {@link java.util.concurrent.ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)}
+ * of an {@link java.util.concurrent.ExecutorService}.
+ * @param stream service to be shutdown
+ * @param initialWait time in seconds to wait for currently running threads to finish execution
+ * @param secondaryWait time in seconds to wait for running threads that did not terminate to acknowledge their forced termination
+ */
+ public static void shutdownExecutor(ExecutorService stream, int initialWait, int secondaryWait) {
+ stream.shutdown();
+ try {
+ if (!stream.awaitTermination(initialWait, TimeUnit.SECONDS)) {
+ stream.shutdownNow();
+ if (!stream.awaitTermination(secondaryWait, TimeUnit.SECONDS)) {
+ LOGGER.error("Executor Service did not terminate");
}
- while( result == null && !StringUtils.isNotEmpty(result) );
-
- return result;
+ }
+ } catch (InterruptedException ie) {
+ stream.shutdownNow();
+ Thread.currentThread().interrupt();
}
+ }
- /**
- * Attempts to safely {@link java.util.concurrent.ExecutorService#shutdown()} and {@link java.util.concurrent.ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)}
- * of an {@link java.util.concurrent.ExecutorService}.
- * @param stream service to be shutdown
- * @param initialWait time in seconds to wait for currently running threads to finish execution
- * @param secondaryWait time in seconds to wait for running threads that did not terminate in the first wait to acknowledge their forced termination
- */
- public static void shutdownExecutor(ExecutorService stream, int initialWait, int secondaryWait) {
- stream.shutdown();
- try {
- if (!stream.awaitTermination(initialWait, TimeUnit.SECONDS)) {
- stream.shutdownNow();
- if (!stream.awaitTermination(secondaryWait, TimeUnit.SECONDS)) {
- LOGGER.error("Executor Service did not terminate");
- }
- }
- } catch (InterruptedException ie) {
- stream.shutdownNow();
- Thread.currentThread().interrupt();
- }
+ /**
+ * Removes all mbeans registered undered a specific domain. Made specificly to clean up at unit tests
+ * @param domain mbean domain
+ */
+ public static void removeAllMBeansOfDomain(String domain) throws Exception {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ domain = domain.endsWith(":") ? domain : domain + ":";
+ ObjectName objectName = new ObjectName(domain + "*");
+ Set<ObjectName> mbeanNames = mbs.queryNames(objectName, null);
+ for (ObjectName name : mbeanNames) {
+ mbs.unregisterMBean(name);
}
+ }
- /**
- * Removes all mbeans registered undered a specific domain. Made specificly to clean up at unit tests
- * @param domain
- */
- public static void removeAllMBeansOfDomain(String domain) throws Exception {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- domain = domain.endsWith(":") ? domain : domain+":";
- ObjectName objectName = new ObjectName(domain+"*");
- Set<ObjectName> mbeanNames = mbs.queryNames(objectName, null);
- for(ObjectName name : mbeanNames) {
- mbs.unregisterMBean(name);
- }
- }
-
- /**
- * Attempts to register an object with local MBeanServer. Throws runtime exception on errors.
- * @param name name to register bean with
- * @param mbean mbean to register
- */
- public static <V> void registerLocalMBean(String name, V mbean) {
- try {
- ObjectName objectName = new ObjectName(name);
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(mbean, objectName);
- } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
- LOGGER.error("Failed to register MXBean : {}", e);
- throw new RuntimeException(e);
- }
+ /**
+ * Attempts to register an object with local MBeanServer. Throws runtime exception on errors.
+ * @param name name to register bean with
+ * @param mbean mbean to register
+ */
+ public static <V> void registerLocalMBean(String name, V mbean) {
+ try {
+ ObjectName objectName = new ObjectName(name);
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(mbean, objectName);
+ } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException ex) {
+ LOGGER.error("Failed to register MXBean : {}", ex);
+ throw new RuntimeException(ex);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/DateUtil.java b/streams-util/src/main/java/org/apache/streams/util/DateUtil.java
deleted file mode 100644
index 7bbb8e9..0000000
--- a/streams-util/src/main/java/org/apache/streams/util/DateUtil.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.util;
-
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-
-/*
- *
- * If you can think of a better way, feel free to implement. This was a great class that I found that
- * solves the majority of the issue I was dealing with.
- *
- * smashew 11=13=2012
- *
- * Site:
- * http://stackoverflow.com/questions/3389348/parse-any-date-in-java
- */
-
-public class DateUtil
-{
-
- private static final String REGEX_ONLY_NUMBERS = "[0-9]+";
-
- private static final Map<String, String> DATE_FORMAT_REGEXPS = new HashMap<String, String>()
- {
- private static final long serialVersionUID = 1L;
- {
- put("^\\d{8}$", "yyyyMMdd");
- put("^\\d{1,2}-\\d{1,2}-\\d{4}$", "dd-MM-yyyy");
- put("^\\d{4}-\\d{1,2}-\\d{1,2}$", "yyyy-MM-dd");
- put("^\\d{1,2}/\\d{1,2}/\\d{4}$", "MM/dd/yyyy");
- put("^\\d{4}/\\d{1,2}/\\d{1,2}$", "yyyy/MM/dd");
- put("^\\d{1,2}\\s[a-z]{3}\\s\\d{4}$", "dd MMM yyyy");
- put("^\\d{1,2}\\s[a-z]{4,}\\s\\d{4}$", "dd MMMM yyyy");
- put("^\\d{12}$", "yyyyMMddHHmm");
- put("^\\d{8}\\s\\d{4}$", "yyyyMMdd HHmm");
- put("^\\d{1,2}-\\d{1,2}-\\d{4}\\s\\d{1,2}:\\d{2}$", "dd-MM-yyyy HH:mm");
- put("^\\d{4}-\\d{1,2}-\\d{1,2}\\s\\d{1,2}:\\d{2}$", "yyyy-MM-dd HH:mm");
- put("^\\d{1,2}/\\d{1,2}/\\d{4}\\s\\d{1,2}:\\d{2}$", "MM/dd/yyyy HH:mm");
- put("^\\d{4}/\\d{1,2}/\\d{1,2}\\s\\d{1,2}:\\d{2}$", "yyyy/MM/dd HH:mm");
- put("^\\d{1,2}\\s[a-z]{3}\\s\\d{4}\\s\\d{1,2}:\\d{2}$", "dd MMM yyyy HH:mm");
- put("^\\d{1,2}\\s[a-z]{4,}\\s\\d{4}\\s\\d{1,2}:\\d{2}$", "dd MMMM yyyy HH:mm");
- put("^\\d{14}$", "yyyyMMddHHmmss");
- put("^\\d{8}\\s\\d{6}$", "yyyyMMdd HHmmss");
- put("^\\d{1,2}-\\d{1,2}-\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "dd-MM-yyyy HH:mm:ss");
- put("^\\d{4}-\\d{1,2}-\\d{1,2}\\s\\d{1,2}:\\d{2}:\\d{2}$", "yyyy-MM-dd HH:mm:ss");
- put("^\\d{1,2}/\\d{1,2}/\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "MM/dd/yyyy HH:mm:ss");
- put("^\\d{4}/\\d{1,2}/\\d{1,2}\\s\\d{1,2}:\\d{2}:\\d{2}$", "yyyy/MM/dd HH:mm:ss");
- put("^\\d{1,2}\\s[a-z]{3}\\s\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "dd MMM yyyy HH:mm:ss");
- put("^\\d{1,2}\\s[a-z]{4,}\\s\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "dd MMMM yyyy HH:mm:ss");
- }
- };
-
- /**
- * Determine SimpleDateFormat pattern matching with the given date string. Returns null if format is unknown. You
- * can simply extend DateUtil with more formats if needed.
- *
- * @param dateString
- * The date string to determine the SimpleDateFormat pattern for.
- * @return The matching SimpleDateFormat pattern, or null if format is unknown.
- * @see java.text.SimpleDateFormat
- */
- public static String determineDateFormat(String dateString)
- throws ParseException
- {
- for (String regexp : DATE_FORMAT_REGEXPS.keySet())
- if (dateString.toLowerCase().matches(regexp))
- return DATE_FORMAT_REGEXPS.get(regexp);
-
- throw new ParseException("unable to parse date",0);
- }
-
- public static DateTime determineDate(String dateString)
- throws ParseException
- {
- // Trim the string just in case it is dirty.
- dateString = dateString.trim();
-
- // check to see if it looks like it is millis. If so, parse as millis and return.
- if(dateString.matches(REGEX_ONLY_NUMBERS))
- return new DateTime(new Date(Long.parseLong(dateString)));
-
- try
- {
- // try to parse the string into a java.date object, if possible.
- SimpleDateFormat dateFormat = new SimpleDateFormat(determineDateFormat(dateString));
- dateFormat.setLenient(false);
- return new DateTime(dateFormat.parse(dateString));
- }
- catch(Exception e)
- {
-
- }
-
- return new DateTime(DateTime.parse(dateString));
- }
-
- public static DateTime determineDateTime(String dateString)
- throws ParseException
- {
- return new DateTime(determineDate(dateString));
- }
-
- public static DateTime determineDateTime(String dateString, DateTimeZone theTimeZone)
- throws ParseException
- {
- DateTime beforeTimeZone = determineDateTime(dateString);
- return new DateTime(beforeTimeZone.getYear(),beforeTimeZone.getMonthOfYear(), beforeTimeZone.getDayOfMonth(), beforeTimeZone.getHourOfDay(), beforeTimeZone.getMinuteOfHour(), beforeTimeZone.getSecondOfMinute(), beforeTimeZone.getMillisOfSecond(), theTimeZone);
- }
-
-
- public static String getAliasForDate(String date, String prefix) throws ParseException {
- return getAliasesForDateRange(date, null, prefix).iterator().next();
- }
-
- public static String getAliasForDate(DateTime date, String prefix) throws ParseException {
- return getAliasesForDateRange(date, null, prefix).iterator().next();
- }
-
- public static Set<String> getAliasesForDateRange(String starDate, String endDate, String prefix)
- throws ParseException
- {
- DateTime start = null;
- DateTime end = null;
- DateTimeFormatter df = ISODateTimeFormat.dateTimeNoMillis();
- try {
- start = df.parseDateTime(starDate);
- } catch (Exception e) {
- //do nothing. try to parse with other parsers
- }
- if(start == null) {
- start = determineDateTime(starDate);
- }
- if(endDate != null) {
- try {
- end = df.parseDateTime(endDate);
- } catch (Exception e) {
- //do nothing. try to parse with other parsers
- }
- if( end == null)
- end = determineDateTime(endDate);
- }
- return getAliasesForDateRange(start, end, prefix);
- }
-
- public static Set<String> getAliasesForDateRange(DateTime startDate, DateTime endDate, String prefix) {
- Set<String> aliases = new HashSet<String>();
- aliases.add(prefix+"_"+getDateAbbreviation(startDate.getYear(), startDate.getMonthOfYear()));
- if(endDate == null) {
- return aliases;
- }
- while(endDate.isAfter(startDate)) {
- aliases.add(prefix+"_"+getDateAbbreviation(endDate.getYear(), endDate.getMonthOfYear()));
- endDate = endDate.minusMonths(1);
- }
- return aliases;
- }
-
- private static String getDateAbbreviation(int year, int month) {
- if(month > 9) {
- return Integer.toString(year)+Integer.toString(month);
- }
- else {
- return Integer.toString(year)+"0"+Integer.toString(month);
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java b/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java
index 1972bc7..2d129de 100644
--- a/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java
@@ -29,20 +29,26 @@ import java.nio.charset.Charset;
*/
public class GuidUtils {
- private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+ private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
- public static String generateGuid(String... parts) {
+ /**
+ * generateGuid from list of parts.
+ * @param parts list of parts
+ * @return guid
+ */
+ public static String generateGuid(String... parts) {
- StringBuilder seed = new StringBuilder();
- for( String part : parts ) {
- Preconditions.checkNotNull(part);
- Preconditions.checkArgument(!Strings.isNullOrEmpty(part));
- seed.append(part);
- }
+ StringBuilder seed = new StringBuilder();
- String hash = Hashing.goodFastHash(24).hashString(seed, UTF8_CHARSET).asBytes().toString();
+ for ( String part : parts ) {
+ Preconditions.checkNotNull(part);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(part));
+ seed.append(part);
+ }
- return hash;
+ String hash = Hashing.goodFastHash(24).hashString(seed, UTF8_CHARSET).asBytes().toString();
- }
+ return hash;
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
index de324d2..ba22d3d 100644
--- a/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
+++ b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
@@ -20,7 +20,11 @@ package org.apache.streams.util;
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
/**
* SerializationUtil contains methods for serializing, deserializing, and cloning
@@ -28,47 +32,62 @@ import java.io.*;
*/
public class SerializationUtil {
- /**
- * BORROwED FROM APACHE STORM PROJECT
- * @param obj
- * @return
- */
- public static byte[] serialize(Object obj) {
- try {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(bos);
- oos.writeObject(obj);
- oos.close();
- return bos.toByteArray();
- } catch(IOException ioe) {
- throw new RuntimeException(ioe);
- }
+ /**
+ * serialize Object as byte array.
+ *
+ * <p/>
+ * BORROwED FROM APACHE STORM PROJECT
+ *
+ * @param obj Object
+ * @return byte[]
+ */
+ public static byte[] serialize(Object obj) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(obj);
+ oos.close();
+ return bos.toByteArray();
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
}
+ }
- /**
- * BORROwED FROM APACHE STORM PROJECT
- * @param serialized
- * @return
- */
- public static Object deserialize(byte[] serialized) {
- try {
- ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- ObjectInputStream ois = new ClassLoaderObjectInputStream(classLoader, bis);
- Object ret = ois.readObject();
- ois.close();
- return ret;
- } catch(IOException ioe) {
- throw new RuntimeException(ioe);
- } catch(ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
+ /**
+ * deserialize byte array as Object.
+ *
+ * <p/>
+ * BORROwED FROM APACHE STORM PROJECT
+ *
+ * @param serialized byte[]
+ * @return Object
+ */
+ public static Object deserialize(byte[] serialized) {
+ try {
+ ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ ObjectInputStream ois = new ClassLoaderObjectInputStream(classLoader, bis);
+ Object ret = ois.readObject();
+ ois.close();
+ return ret;
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ } catch (ClassNotFoundException ex) {
+ throw new RuntimeException(ex);
}
+ }
-
- public static <T> T cloneBySerialization(T obj) {
- if( obj != null )
- return (T) deserialize(serialize(obj));
- else return null;
+ /**
+ * clone Object by serialization.
+ * @param obj Object
+ * @param <T> type
+ * @return cloned Object
+ */
+ public static <T> T cloneBySerialization(T obj) {
+ if ( obj != null ) {
+ return (T) deserialize(serialize(obj));
+ } else {
+ return null;
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
index 7fbfc6b..3dc3e08 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
@@ -22,61 +22,64 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public abstract class AbstractBackOffStrategy implements BackOffStrategy {
- private long baseSleepTime;
- private long lastSleepTime;
- private int maxAttempts;
- private AtomicInteger attemptsCount;
+ private long baseSleepTime;
+ private long lastSleepTime;
+ private int maxAttempts;
+ private AtomicInteger attemptsCount;
- /**
- * A BackOffStrategy that can effectively be used endlessly.
- * @param baseBackOffTime amount of time back of in seconds
- */
- public AbstractBackOffStrategy(long baseBackOffTime) {
- this(baseBackOffTime, -1);
- }
+ /**
+ * A BackOffStrategy that can effectively be used endlessly.
+ * @param baseBackOffTime amount of time back of in seconds
+ */
+ public AbstractBackOffStrategy(long baseBackOffTime) {
+ this(baseBackOffTime, -1);
+ }
- /**
- * A BackOffStrategy that has a limited number of uses before it throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException}
- * @param baseBackOffTime time to back off in milliseconds, must be greater than 0.
- * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1. -1 indicates there is no maximum number of attempts.
- */
- public AbstractBackOffStrategy(long baseBackOffTime, int maximumNumberOfBackOffAttempts) {
- if(baseBackOffTime <= 0) {
- throw new IllegalArgumentException("backOffTimeInMilliSeconds is not greater than 0 : "+baseBackOffTime);
- }
- if(maximumNumberOfBackOffAttempts<=0 && maximumNumberOfBackOffAttempts != -1) {
- throw new IllegalArgumentException("maximumNumberOfBackOffAttempts is not greater than 0 : "+maximumNumberOfBackOffAttempts);
- }
- this.baseSleepTime = baseBackOffTime;
- this.maxAttempts = maximumNumberOfBackOffAttempts;
- this.attemptsCount = new AtomicInteger(0);
+ /**
+ * A BackOffStrategy that has a limited number of uses before it throws a
+ * {@link org.apache.streams.util.api.requests.backoff.BackOffException}.
+ * @param baseBackOffTime time to back off in milliseconds, must be greater than 0.
+ * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1.
+ * -1 indicates there is no maximum number of attempts.
+ */
+ public AbstractBackOffStrategy(long baseBackOffTime, int maximumNumberOfBackOffAttempts) {
+ if (baseBackOffTime <= 0) {
+ throw new IllegalArgumentException("backOffTimeInMilliSeconds is not greater than 0 : " + baseBackOffTime);
}
-
- @Override
- public void backOff() throws BackOffException {
- int attempt = this.attemptsCount.getAndIncrement();
- if(attempt >= this.maxAttempts && this.maxAttempts != -1) {
- throw new BackOffException(attempt, this.lastSleepTime);
- } else {
- try {
- Thread.sleep(this.lastSleepTime = calculateBackOffTime(attempt, this.baseSleepTime));
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
+ if (maximumNumberOfBackOffAttempts <= 0 && maximumNumberOfBackOffAttempts != -1) {
+ throw new IllegalArgumentException("maximumNumberOfBackOffAttempts is not greater than 0 : " + maximumNumberOfBackOffAttempts);
}
+ this.baseSleepTime = baseBackOffTime;
+ this.maxAttempts = maximumNumberOfBackOffAttempts;
+ this.attemptsCount = new AtomicInteger(0);
+ }
- @Override
- public void reset() {
- this.attemptsCount.set(0);
+ @Override
+ public void backOff() throws BackOffException {
+ int attempt = this.attemptsCount.getAndIncrement();
+ if (attempt >= this.maxAttempts && this.maxAttempts != -1) {
+ throw new BackOffException(attempt, this.lastSleepTime);
+ } else {
+ try {
+ Thread.sleep(this.lastSleepTime = calculateBackOffTime(attempt, this.baseSleepTime));
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
}
+ }
+
+ @Override
+ public void reset() {
+ this.attemptsCount.set(0);
+ }
- /**
- * Calculate the amount of time in milliseconds that the strategy should back off for
- * @param attemptCount the number of attempts the strategy has backed off. i.e. 1 -> this is the first attempt, 2 -> this is the second attempt, etc.
- * @param baseSleepTime the minimum amount of time it should back off for in milliseconds
- * @return the amount of time it should back off in milliseconds
- */
- protected abstract long calculateBackOffTime(int attemptCount, long baseSleepTime);
+ /**
+ * Calculate the amount of time in milliseconds that the strategy should back off for
+ * @param attemptCount the number of attempts the strategy has backed off.
+ * i.e. 1 -> this is the first attempt, 2 -> this is the second attempt, etc.
+ * @param baseSleepTime the minimum amount of time it should back off for in milliseconds
+ * @return the amount of time it should back off in milliseconds
+ */
+ protected abstract long calculateBackOffTime(int attemptCount, long baseSleepTime);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
index 223303c..692c0b6 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
@@ -21,43 +21,49 @@ package org.apache.streams.util.api.requests.backoff;
*/
public class BackOffException extends Exception {
- private int attemptCount;
- private long sleepTime;
-
- public BackOffException() {
- this(-1, -1);
- }
-
- public BackOffException(String message) {
- this(message, -1, -1);
- }
-
- public BackOffException(int attemptCount, long maxSleepTime) {
- this.attemptCount = attemptCount;
- this.sleepTime = maxSleepTime;
- }
-
- public BackOffException(String message, int attemptCount, long maxSleepTime) {
- super(message);
- this.attemptCount = attemptCount;
- this.sleepTime = maxSleepTime;
- }
-
- /**
- * Gets the number of back off attempts that happened before the exception was thrown. If the function that
- * initialized this exception does not set the number of attempts, -1 will be returned.
- * @return number of attempts
- */
- public int getNumberOfBackOffsAttempted() {
- return this.attemptCount;
- }
-
- /**
- * Gets the longest sleep period that the strategy attempted. If the function that
- * initialized this exception does not set the longest sleep period, -1 will be returned.
- * @return
- */
- public long getLongestBackOff() {
- return this.sleepTime;
- }
+ private int attemptCount;
+ private long sleepTime;
+
+ public BackOffException() {
+ this(-1, -1);
+ }
+
+ public BackOffException(String message) {
+ this(message, -1, -1);
+ }
+
+ public BackOffException(int attemptCount, long maxSleepTime) {
+ this.attemptCount = attemptCount;
+ this.sleepTime = maxSleepTime;
+ }
+
+ /**
+ * BackOffException constructor.
+ * @param message message
+ * @param attemptCount attemptCount
+ * @param maxSleepTime maxSleepTime (in millis)
+ */
+ public BackOffException(String message, int attemptCount, long maxSleepTime) {
+ super(message);
+ this.attemptCount = attemptCount;
+ this.sleepTime = maxSleepTime;
+ }
+
+ /**
+ * Gets the number of back off attempts that happened before the exception was thrown. If the function that
+ * initialized this exception does not set the number of attempts, -1 will be returned.
+ * @return number of back off attempts
+ */
+ public int getNumberOfBackOffsAttempted() {
+ return this.attemptCount;
+ }
+
+ /**
+ * Gets the longest sleep period that the strategy attempted. If the function that
+ * initialized this exception does not set the longest sleep period, -1 will be returned.
+ * @return longest sleep period that the strategy attempted
+ */
+ public long getLongestBackOff() {
+ return this.sleepTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
index a0d80e8..44497ab 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
@@ -19,6 +19,7 @@ package org.apache.streams.util.api.requests.backoff;
* BackOffStrategy will cause the current thread to sleep for a specific amount of time. This is used to adhere to
* api rate limits.
*
+ * <p/>
* The example below illustrates using a BackOffStrategy to slow down requests when you hit a rate limit exception.
*
* <code>
@@ -36,16 +37,17 @@ package org.apache.streams.util.api.requests.backoff;
*/
public interface BackOffStrategy {
- /**
- * Cause the current thread to sleep for an amount of time based on the implemented strategy. If limits are set
- * on the number of times the backOff can be called, an exception will be thrown.
- * @throws BackOffException
- */
- public void backOff() throws BackOffException;
+ /**
+ * Cause the current thread to sleep for an amount of time based on the implemented strategy. If limits are set
+ * on the number of times the backOff can be called, an exception will be thrown.
+ * @throws BackOffException BackOffException
+ */
+ public void backOff() throws BackOffException;
- /**
- * Rests the back off strategy to its original state. After the call the strategy will act as if {@link AbstractBackOffStrategy#backOff()}
- * has never been called.
- */
- public void reset();
+ /**
+ * Rests the back off strategy to its original state.
+ * After the call the strategy will act as if {@link AbstractBackOffStrategy#backOff()}
+ * has never been called.
+ */
+ public void reset();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
index b3fd3f2..26ec225 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
@@ -24,25 +24,27 @@ import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy;
*/
public class ConstantTimeBackOffStrategy extends AbstractBackOffStrategy {
- /**
- * A ConstantTimeBackOffStrategy that can effectively be used endlessly.
- * @param baseBackOffTimeInMiliseconds amount of time back of in milliseconds
- */
- public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds) {
- this(baseBackOffTimeInMiliseconds, -1);
- }
+ /**
+ * A ConstantTimeBackOffStrategy that can effectively be used endlessly.
+ * @param baseBackOffTimeInMiliseconds amount of time back of in milliseconds
+ */
+ public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds) {
+ this(baseBackOffTimeInMiliseconds, -1);
+ }
- /**
- * A ConstantTimeBackOffStrategy that has a limited number of uses before it throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException}
- * @param baseBackOffTimeInMiliseconds time to back off in milliseconds, must be greater than 0.
- * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1. -1 indicates there is no maximum number of attempts.
- */
- public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds, int maximumNumberOfBackOffAttempts) {
- super(baseBackOffTimeInMiliseconds, maximumNumberOfBackOffAttempts);
- }
+ /**
+ * A ConstantTimeBackOffStrategy that has a limited number of uses before it
+ * throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException}
+ * @param baseBackOffTimeInMiliseconds time to back off in milliseconds, must be greater than 0.
+ * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1.
+ * -1 indicates there is no maximum number of attempts.
+ */
+ public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds, int maximumNumberOfBackOffAttempts) {
+ super(baseBackOffTimeInMiliseconds, maximumNumberOfBackOffAttempts);
+ }
- @Override
- protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
- return baseSleepTime;
- }
+ @Override
+ protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+ return baseSleepTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
index a5a9656..0962984 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
@@ -18,30 +18,29 @@ package org.apache.streams.util.api.requests.backoff.impl;
import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy;
/**
- * Exponential backk strategy. Caluclated by baseBackOffTimeInSeconds raised the attempt-count power.
+ * Exponential backoff strategy. Calculated by baseBackOffTimeInSeconds raised the attempt-count power.
*/
public class ExponentialBackOffStrategy extends AbstractBackOffStrategy {
+ /**
+ * Unlimited use ExponentialBackOffStrategy.
+ * @param baseBackOffTimeInSeconds baseBackOffTimeInSeconds
+ */
+ public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds) {
+ this(baseBackOffTimeInSeconds, -1);
+ }
- /**
- * Unlimited use ExponentialBackOffStrategy
- * @param baseBackOffTimeInSeconds
- */
- public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds) {
- this(baseBackOffTimeInSeconds, -1);
- }
+ /**
+ * Limited use ExponentialBackOffStrategy.
+ * @param baseBackOffTimeInSeconds baseBackOffTimeInSeconds
+ * @param maxNumAttempts maxNumAttempts
+ */
+ public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds, int maxNumAttempts) {
+ super(baseBackOffTimeInSeconds, maxNumAttempts);
+ }
- /**
- * Limited use ExponentialBackOffStrategy
- * @param baseBackOffTimeInSeconds
- * @param maxNumAttempts
- */
- public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds, int maxNumAttempts) {
- super(baseBackOffTimeInSeconds, maxNumAttempts);
- }
-
- @Override
- protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
- return Math.round(Math.pow(baseSleepTime, attemptCount)) * 1000;
- }
+ @Override
+ protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+ return Math.round(Math.pow(baseSleepTime, attemptCount)) * 1000;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
index 38d05a1..d6f323f 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
@@ -24,17 +24,16 @@ import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy;
*/
public class LinearTimeBackOffStrategy extends AbstractBackOffStrategy {
+ public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds) {
+ this(baseBackOffTimeInSeconds, -1);
+ }
- public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds) {
- this(baseBackOffTimeInSeconds, -1);
- }
+ public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds, int maxAttempts) {
+ super(baseBackOffTimeInSeconds, -1);
+ }
- public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds, int maxAttempts) {
- super(baseBackOffTimeInSeconds, -1);
- }
-
- @Override
- protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
- return 1000L * attemptCount * baseSleepTime;
- }
+ @Override
+ protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+ return 1000L * attemptCount * baseSleepTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
index dfdec72..41ec4b6 100644
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
@@ -12,22 +12,23 @@ software distributed under the License is distributed on an
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License. */
+
package org.apache.streams.util.oauth.tokens;
/**
- *
+ * AbstractOauthToken.
*/
public abstract class AbstractOauthToken {
- /**
- * Must create equals method for all OauthTokens.
- * @param o
- * @return true if equal, and false otherwise
- */
- protected abstract boolean internalEquals(Object o);
+ /**
+ * Must create equals method for all OauthTokens.
+ * @param object object for comparison
+ * @return true if equal, and false otherwise
+ */
+ protected abstract boolean internalEquals(Object object);
- @Override
- public boolean equals(Object o) {
- return this.internalEquals(o);
- }
+ @Override
+ public boolean equals(Object object) {
+ return this.internalEquals(object);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
index fed194f..7b3f370 100644
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
@@ -12,42 +12,40 @@ software distributed under the License is distributed on an
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License. */
-package org.apache.streams.util.oauth.tokens.tokenmanager;
-import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
+package org.apache.streams.util.oauth.tokens.tokenmanager;
import java.util.Collection;
/**
- * Manges access to oauth tokens. Allows a caller to add tokens to the token pool and receive an available token.
+ * Manages access to oauth tokens. Allows a caller to add tokens to the token pool and receive an available token.
*/
public interface SimpleTokenManager<T> {
-
- /**
- * Adds a token to the available token pool.
- * @param token Token to be added
- * @return true, if token was successfully added to the pool and false otherwise.
- */
- public boolean addTokenToPool(T token);
-
- /**
- * Adds a {@link java.util.Collection} of tokens to the available token pool.
- * @param tokens Tokens to be added
- * @return true, if the token pool size increased after adding the tokens, and false otherwise.
- */
- public boolean addAllTokensToPool(Collection<T> tokens);
-
- /**
- * Get an available token. If no tokens are available it returns null.
- * @return next available token
- */
- public T getNextAvailableToken();
-
- /**
- * Get the number of available tokens
- * @return number of available tokens
- */
- public int numAvailableTokens();
+ /**
+ * Adds a token to the available token pool.
+ * @param token Token to be added
+ * @return true, if token was successfully added to the pool and false otherwise.
+ */
+ public boolean addTokenToPool(T token);
+
+ /**
+ * Adds a {@link java.util.Collection} of tokens to the available token pool.
+ * @param tokens Tokens to be added
+ * @return true, if the token pool size increased after adding the tokens, and false otherwise.
+ */
+ public boolean addAllTokensToPool(Collection<T> tokens);
+
+ /**
+ * Get an available token. If no tokens are available it returns null.
+ * @return next available token
+ */
+ public T getNextAvailableToken();
+
+ /**
+ * Get the number of available tokens.
+ * @return number of available tokens
+ */
+ public int numAvailableTokens();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManager.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManager.java
new file mode 100644
index 0000000..7c1a9e3
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManager.java
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+
+package org.apache.streams.util.oauth.tokens.tokenmanager.impl;
+
+import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Manages a pool of tokens the most basic possible way.
+ * If all tokens are added to the manager before {@link BasicTokenManager#getNextAvailableToken() getNextAvailableToken}
+ * is called tokens are issued in the order they were added to the manager, FIFO. The BasicTokenManager acts as a circular queue
+ * of tokens. Once the manager issues all available tokens it will cycle back to the first token and start issuing tokens again.
+ *
+ * </p>
+ * When adding tokens to the pool of available tokens, the manager will not add tokens that are already in the pool.
+ *
+ * <p/>
+ * The manager class is thread safe.
+ */
+public class BasicTokenManager<T> implements SimpleTokenManager<T> {
+
+ private ArrayList<T> availableTokens;
+ private int nextToken;
+
+ public BasicTokenManager() {
+ this(null);
+ }
+
+ /**
+ * BasicTokenManager constructor.
+ * @param tokens Collection of tokens
+ */
+ public BasicTokenManager(Collection<T> tokens) {
+ if (tokens != null) {
+ this.availableTokens = new ArrayList<T>(tokens.size());
+ this.addAllTokensToPool(tokens);
+ } else {
+ this.availableTokens = new ArrayList<T>();
+ }
+ this.nextToken = 0;
+ }
+
+ @Override
+ public synchronized boolean addTokenToPool(T token) {
+ if (token == null || this.availableTokens.contains(token)) {
+ return false;
+ } else {
+ return this.availableTokens.add(token);
+ }
+ }
+
+ @Override
+ public synchronized boolean addAllTokensToPool(Collection<T> tokens) {
+ int startSize = this.availableTokens.size();
+ for (T token : tokens) {
+ this.addTokenToPool(token);
+ }
+ return startSize < this.availableTokens.size();
+ }
+
+ @Override
+ public synchronized T getNextAvailableToken() {
+ T token = null;
+ if (this.availableTokens.size() == 0) {
+ return token;
+ } else {
+ token = this.availableTokens.get(nextToken++);
+ if (nextToken == this.availableTokens.size()) {
+ nextToken = 0;
+ }
+ return token;
+ }
+ }
+
+ @Override
+ public synchronized int numAvailableTokens() {
+ return this.availableTokens.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
deleted file mode 100644
index 4c64bf7..0000000
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance *
-http://www.apache.org/licenses/LICENSE-2.0 *
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License. */
-package org.apache.streams.util.oauth.tokens.tokenmanager.impl;
-
-import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
-import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
-
-import java.util.ArrayList;
-import java.util.Collection;
-
-/**
- * Manages a pool of tokens the most basic possible way. If all tokens are added to the manager before {@link BasicTokenManger#getNextAvailableToken() getNextAvailableToken}
- * is called tokens are issued in the order they were added to the manager, FIFO. The BasicTokenManager acts as a circular queue
- * of tokens. Once the manager issues all available tokens it will cycle back to the first token and start issuing tokens again.
- *
- * When adding tokens to the pool of available tokens, the manager will not add tokens that are already in the pool.
- *
- * The manager class is thread safe.
- */
-public class BasicTokenManger<T> implements SimpleTokenManager<T>{
-
- private ArrayList<T> availableTokens;
- private int nextToken;
-
- public BasicTokenManger() {
- this(null);
- }
-
- public BasicTokenManger(Collection<T> tokens) {
- if(tokens != null) {
- this.availableTokens = new ArrayList<T>(tokens.size());
- this.addAllTokensToPool(tokens);
- } else {
- this.availableTokens = new ArrayList<T>();
- }
- this.nextToken = 0;
- }
-
- @Override
- public synchronized boolean addTokenToPool(T token) {
- if(token == null || this.availableTokens.contains(token))
- return false;
- else
- return this.availableTokens.add(token);
- }
-
- @Override
- public synchronized boolean addAllTokensToPool(Collection<T> tokens) {
- int startSize = this.availableTokens.size();
- for(T token : tokens) {
- this.addTokenToPool(token);
- }
- return startSize < this.availableTokens.size();
- }
-
- @Override
- public synchronized T getNextAvailableToken() {
- T token = null;
- if(this.availableTokens.size() == 0) {
- return token;
- } else {
- token = this.availableTokens.get(nextToken++);
- if(nextToken == this.availableTokens.size()) {
- nextToken = 0;
- }
- return token;
- }
- }
-
- @Override
- public synchronized int numAvailableTokens() {
- return this.availableTokens.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java b/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java
index 450851e..57a1d44 100644
--- a/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java
+++ b/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java
@@ -15,6 +15,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.util.schema;
/**
@@ -22,10 +23,10 @@ package org.apache.streams.util.schema;
* be able to translate.
*/
public enum FieldType {
- STRING,
- INTEGER,
- NUMBER,
- BOOLEAN,
- OBJECT,
- ARRAY
+ STRING,
+ INTEGER,
+ NUMBER,
+ BOOLEAN,
+ OBJECT,
+ ARRAY
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java b/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java
index 6582565..a437ca4 100644
--- a/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java
+++ b/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java
@@ -15,11 +15,9 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.util.schema;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
@@ -27,25 +25,32 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
*/
public class FieldUtil {
- public static FieldType determineFieldType(ObjectNode fieldNode) {
- String typeSchemaField = "type";
- if( !fieldNode.has(typeSchemaField))
- return null;
- String typeSchemaFieldValue = fieldNode.get(typeSchemaField).asText();
- if( typeSchemaFieldValue.equals("string")) {
- return FieldType.STRING;
- } else if( typeSchemaFieldValue.equals("integer")) {
- return FieldType.INTEGER;
- } else if( typeSchemaFieldValue.equals("number")) {
- return FieldType.NUMBER;
- } else if( typeSchemaFieldValue.equals("object")) {
- return FieldType.OBJECT;
- } else if( typeSchemaFieldValue.equals("boolean")) {
- return FieldType.BOOLEAN;
- } else if( typeSchemaFieldValue.equals("array")) {
- return FieldType.ARRAY;
- }
- else return null;
+ /**
+ * determine FieldType from ObjectNode.
+ * @param fieldNode ObjectNode
+ * @return FieldType
+ */
+ public static FieldType determineFieldType(ObjectNode fieldNode) {
+ String typeSchemaField = "type";
+ if ( !fieldNode.has(typeSchemaField)) {
+ return null;
+ }
+ String typeSchemaFieldValue = fieldNode.get(typeSchemaField).asText();
+ if ( typeSchemaFieldValue.equals("string")) {
+ return FieldType.STRING;
+ } else if ( typeSchemaFieldValue.equals("integer")) {
+ return FieldType.INTEGER;
+ } else if ( typeSchemaFieldValue.equals("number")) {
+ return FieldType.NUMBER;
+ } else if ( typeSchemaFieldValue.equals("object")) {
+ return FieldType.OBJECT;
+ } else if ( typeSchemaFieldValue.equals("boolean")) {
+ return FieldType.BOOLEAN;
+ } else if ( typeSchemaFieldValue.equals("array")) {
+ return FieldType.ARRAY;
+ } else {
+ return null;
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java b/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java
index c51339a..5acd5a8 100644
--- a/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java
+++ b/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java
@@ -15,6 +15,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.util.schema;
import com.google.common.base.Preconditions;
@@ -35,60 +36,93 @@ import java.util.List;
*/
public class FileUtil {
- private final static Logger LOGGER = LoggerFactory.getLogger(FileUtil.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileUtil.class);
- public static String dropSourcePathPrefix(String inputFile, String sourceDirectory) {
- if(Strings.isNullOrEmpty(sourceDirectory))
- return inputFile;
- else {
- try {
- if( inputFile.contains(sourceDirectory) && inputFile.indexOf(sourceDirectory) > 0) {
- return inputFile.substring(inputFile.indexOf(sourceDirectory)+sourceDirectory.length()+1);
- }
- } catch( Throwable e ) {
- return inputFile;
- }
+ /**
+ * drop source path prefix between inputFile and sourceDirectory.
+ * @param inputFile inputFile
+ * @param sourceDirectory sourceDirectory
+ * @return without path prefix
+ */
+ public static String dropSourcePathPrefix(String inputFile, String sourceDirectory) {
+ if (Strings.isNullOrEmpty(sourceDirectory)) {
+ return inputFile;
+ } else {
+ try {
+ if ( inputFile.contains(sourceDirectory) && inputFile.indexOf(sourceDirectory) > 0) {
+ return inputFile.substring(inputFile.indexOf(sourceDirectory) + sourceDirectory.length() + 1);
}
+ } catch ( Throwable throwable ) {
return inputFile;
+ }
}
+ return inputFile;
+ }
- public static String swapExtension(String inputFile, String originalExtension, String newExtension) {
- if(inputFile.endsWith("."+originalExtension))
- return inputFile.replace("."+originalExtension, "."+newExtension);
- else return inputFile;
+ /**
+ * swapExtension.
+ * @param inputFile inputFile
+ * @param originalExtension originalExtension
+ * @param newExtension newExtension
+ * @return extension swapped
+ */
+ public static String swapExtension(String inputFile, String originalExtension, String newExtension) {
+ if (inputFile.endsWith("." + originalExtension)) {
+ return inputFile.replace("." + originalExtension, "." + newExtension);
+ } else {
+ return inputFile;
}
+ }
- public static String dropExtension(String inputFile) {
- if(inputFile.contains("."))
- return inputFile.substring(0, inputFile.lastIndexOf("."));
- else return inputFile;
+ /**
+ * dropExtension.
+ * @param inputFile inputFile
+ * @return extension dropped
+ */
+ public static String dropExtension(String inputFile) {
+ if (inputFile.contains(".")) {
+ return inputFile.substring(0, inputFile.lastIndexOf("."));
+ } else {
+ return inputFile;
}
+ }
- public static void writeFile(String resourceFile, String resourceContent) {
- try {
- File path = new File(resourceFile);
- File dir = path.getParentFile();
- if( !dir.exists() )
- dir.mkdirs();
- Files.write(Paths.get(resourceFile), resourceContent.getBytes(), StandardOpenOption.CREATE_NEW);
- } catch (Exception e) {
- LOGGER.error("Write Exception: {}", e);
- }
+ /**
+ * writeFile.
+ * @param resourceFile resourceFile
+ * @param resourceContent resourceContent
+ */
+ public static void writeFile(String resourceFile, String resourceContent) {
+ try {
+ File path = new File(resourceFile);
+ File dir = path.getParentFile();
+ if ( !dir.exists() ) {
+ dir.mkdirs();
+ }
+ Files.write(Paths.get(resourceFile), resourceContent.getBytes(), StandardOpenOption.CREATE_NEW);
+ } catch (Exception ex) {
+ LOGGER.error("Write Exception: {}", ex);
}
+ }
- public static void resolveRecursive(GenerationConfig config, List<File> schemaFiles) {
-
- Preconditions.checkArgument(schemaFiles.size() > 0);
- int i = 0;
- while( schemaFiles.size() > i) {
- File child = schemaFiles.get(i);
- if (child.isDirectory()) {
- schemaFiles.addAll(Arrays.asList(child.listFiles(config.getFileFilter())));
- schemaFiles.remove(child);
- } else {
- i += 1;
- }
- }
+ /**
+ * resolveRecursive.
+ * @param config GenerationConfig
+ * @param schemaFiles List of schemaFiles
+ */
+ public static void resolveRecursive(GenerationConfig config, List<File> schemaFiles) {
+ Preconditions.checkArgument(schemaFiles.size() > 0);
+ int index = 0;
+ while ( schemaFiles.size() > index) {
+ File child = schemaFiles.get(index);
+ if (child.isDirectory()) {
+ schemaFiles.addAll(Arrays.asList(child.listFiles(config.getFileFilter())));
+ schemaFiles.remove(child);
+ } else {
+ index += 1;
+ }
}
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java b/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java
index c48d186..d7fa7e7 100644
--- a/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java
+++ b/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java
@@ -15,6 +15,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.util.schema;
import java.io.File;
@@ -24,110 +25,39 @@ import java.util.Iterator;
/**
* GenerationConfig represents the common fields and field accessors for
- * streams modules that transform schemas into generated-sources or generated-resources
+ * streams modules that transform schemas into generated-sources or generated-resources.
*/
public interface GenerationConfig {
- /**
- * Gets the 'source' configuration option.
- *
- * @return The source file(s) or directory(ies) from which JSON Schema will
- * be read.
- */
- Iterator<URL> getSource();
-
- /**
- * Gets the 'targetDirectory' configuration option.
- *
- * @return The target directory into which generated types will be written
- * (may or may not exist before types are written)
- */
- File getTargetDirectory();
-
- /**
- * Gets the 'outputEncoding' configuration option.
- *
- * @return The character encoding that should be used when writing output files.
- */
- String getOutputEncoding();
-
- /**
- * Gets the file filter used to isolate the schema mapping files in the
- * source directories.
- *
- * @return the file filter use when scanning for schema files.
- */
- FileFilter getFileFilter();
-
- /**
- * Gets the 'includeAdditionalProperties' configuration option.
- *
- * @return Whether to allow 'additional properties' support in objects.
- * Setting this to false will disable additional properties support,
- * regardless of the input schema(s).
- */
-// boolean isIncludeAdditionalProperties();
-
- /**
- * Gets the 'targetVersion' configuration option.
- *
- * @return The target version for generated source files.
- */
-// String getTargetVersion();
-
-// /**
-// * Gets the `includeDynamicAccessors` configuraiton option.
-// *
-// * @return Whether to include dynamic getters, setters, and builders
-// * or to omit these methods.
-// */
-// boolean isIncludeDynamicAccessors();
-
-// /**
-// * Gets the `dateTimeType` configuration option.
-// * <p>
-// * Example values:
-// * <ul>
-// * <li><code>org.joda.time.LocalDateTime</code> (Joda)</li>
-// * <li><code>java.time.LocalDateTime</code> (JSR310)</li>
-// * <li><code>null</code> (default behavior)</li>
-// * </ul>
-// *
-// * @return The java type to use instead of {@link java.util.Date}
-// * when adding date type fields to generate Java types.
-// */
-// String getDateTimeType();
-//
-// /**
-// * Gets the `dateType` configuration option.
-// * <p>
-// * Example values:
-// * <ul>
-// * <li><code>org.joda.time.LocalDate</code> (Joda)</li>
-// * <li><code>java.time.LocalDate</code> (JSR310)</li>
-// * <li><code>null</code> (default behavior)</li>
-// * </ul>
-// *
-// * @return The java type to use instead of string
-// * when adding string type fields with a format of date (not
-// * date-time) to generated Java types.
-// */
-// String getDateType();
-//
-// /**
-// * Gets the `timeType` configuration option.
-// * <p>
-// * Example values:
-// * <ul>
-// * <li><code>org.joda.time.LocalTime</code> (Joda)</li>
-// * <li><code>java.time.LocalTime</code> (JSR310)</li>
-// * <li><code>null</code> (default behavior)</li>
-// * </ul>
-// *
-// * @return The java type to use instead of string
-// * when adding string type fields with a format of time (not
-// * date-time) to generated Java types.
-// */
-// String getTimeType();
+ /**
+ * Gets the 'source' configuration option.
+ *
+ * @return The source file(s) or directory(ies) from which JSON Schema will
+ * be read.
+ */
+ Iterator<URL> getSource();
+
+ /**
+ * Gets the 'targetDirectory' configuration option.
+ *
+ * @return The target directory into which generated types will be written
+ * (may or may not exist before types are written)
+ */
+ File getTargetDirectory();
+
+ /**
+ * Gets the 'outputEncoding' configuration option.
+ *
+ * @return The character encoding that should be used when writing output files.
+ */
+ String getOutputEncoding();
+
+ /**
+ * Gets the file filter used to isolate the schema mapping files in the
+ * source directories.
+ *
+ * @return the file filter use when scanning for schema files.
+ */
+ FileFilter getFileFilter();
}