You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/08 17:44:09 UTC
[7/8] git commit: Added licenses,
and more code formatting to comply with styles.
Added licenses, and more code formatting to comply with styles.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d1018e90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d1018e90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d1018e90
Branch: refs/heads/master
Commit: d1018e9070d8976ac3175f2b89af91c933d711f9
Parents: 361d122
Author: Matthew Hager <Ma...@gmail.com>
Authored: Mon May 5 17:32:35 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Mon May 5 17:32:35 2014 -0500
----------------------------------------------------------------------
.../streams-persist-s3/pom.xml | 17 +++++
.../org/apache/streams/s3/S3Configurator.java | 17 +++++
.../streams/s3/S3ObjectInputStreamWrapper.java | 17 +++++
.../streams/s3/S3OutputStreamWrapper.java | 17 +++++
.../org/apache/streams/s3/S3PersistReader.java | 17 +++++
.../apache/streams/s3/S3PersistReaderTask.java | 17 +++++
.../org/apache/streams/s3/S3PersistWriter.java | 76 +++++++++++++-------
.../TwitterUserInformationProvider.java | 24 ++++++-
8 files changed, 175 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
index 4e9b9b1..5cadd5c 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
@@ -1,4 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
index 3413ef7..dfa0426 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.streams.s3;
import com.fasterxml.jackson.databind.ObjectMapper;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
index 900ebfb..c13314d 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.streams.s3;
import com.amazonaws.services.s3.model.S3Object;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
index c488b48..08fc774 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.streams.s3;
import com.amazonaws.services.s3.AmazonS3Client;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index 938dc66..5c7413e 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.streams.s3;
import com.amazonaws.ClientConfiguration;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index 5b4abe4..9967216 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.streams.s3;
import com.google.common.base.Strings;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 3685012..98671ba 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.streams.s3;
import com.amazonaws.ClientConfiguration;
@@ -47,14 +64,33 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
private OutputStreamWriter currentWriter = null;
protected volatile Queue<StreamsDatum> persistQueue;
- public AmazonS3Client getAmazonS3Client() { return this.amazonS3Client; }
- public S3WriterConfiguration getS3WriterConfiguration() { return this.s3WriterConfiguration; }
- public List<String> getWrittenFiles() { return this.writtenFiles; }
- public Map<String, String> getObjectMetaData() { return this.objectMetaData; }
- public ObjectMapper getObjectMapper() { return this.objectMapper; }
+ public AmazonS3Client getAmazonS3Client() {
+ return this.amazonS3Client;
+ }
+
+ public S3WriterConfiguration getS3WriterConfiguration() {
+ return this.s3WriterConfiguration;
+ }
+
+ public List<String> getWrittenFiles() {
+ return this.writtenFiles;
+ }
- public void setObjectMapper(ObjectMapper mapper) { this.objectMapper = mapper; }
- public void setObjectMetaData(Map<String, String> val) { this.objectMetaData = val; }
+ public Map<String, String> getObjectMetaData() {
+ return this.objectMetaData;
+ }
+
+ public ObjectMapper getObjectMapper() {
+ return this.objectMapper;
+ }
+
+ public void setObjectMapper(ObjectMapper mapper) {
+ this.objectMapper = mapper;
+ }
+
+ public void setObjectMetaData(Map<String, String> val) {
+ this.objectMetaData = val;
+ }
/**
* Instantiator with a pre-existing amazonS3Client, this is used to help with re-use.
@@ -75,15 +111,13 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
@Override
public void write(StreamsDatum streamsDatum) {
- synchronized (this)
- {
+ synchronized (this) {
// Check to see if we need to reset the file that we are currently working with
if (this.currentWriter == null || ( this.bytesWrittenThisFile.get() >= (this.s3WriterConfiguration.getMaxFileSize() * 1024 * 1024))) {
try {
LOGGER.info("Resetting the file");
this.currentWriter = resetFile();
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
@@ -108,8 +142,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
}
- private synchronized OutputStreamWriter resetFile() throws Exception
- {
+ private synchronized OutputStreamWriter resetFile() throws Exception {
// this will keep it thread safe, so we don't create too many files
if(this.fileLineCounter.get() == 0 && this.currentWriter != null)
return this.currentWriter;
@@ -117,8 +150,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
closeAndDestroyWriter();
// Create the path for where the file is going to live.
- try
- {
+ try {
// generate a file name
String fileName = this.s3WriterConfiguration.getWriterFilePrefix() +
(this.s3WriterConfiguration.getChunk() ? "/" : "-") + new Date().getTime() + ".tsv";
@@ -142,9 +174,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
// return the output stream
return new OutputStreamWriter(outputStream);
- }
- catch (Exception e)
- {
+ } catch (Exception e) {
LOGGER.error(e.getMessage());
throw e;
}
@@ -157,8 +187,8 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
this.closeSafely(this.currentWriter);
this.currentWriter = null;
- //
- LOGGER.info("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size()-1));
+ // Logging of information to alert the user to the activities of this class
+ LOGGER.debug("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size()-1));
}
}
@@ -167,8 +197,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
try {
writer.flush();
writer.close();
- }
- catch(Exception e) {
+ } catch(Exception e) {
// noOp
}
LOGGER.debug("File Closed");
@@ -180,8 +209,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
if(flushable != null) {
try {
flushable.flush();
- }
- catch(IOException e) {
+ } catch(IOException e) {
// noOp
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d1018e90/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index dac5cd6..04aa1fe 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -1,7 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.streams.twitter.provider;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.Config;
@@ -9,13 +25,15 @@ import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
import org.apache.streams.twitter.TwitterUserInformationConfiguration;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-import twitter4j.*;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterFactory;
+import twitter4j.User;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;