You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/07/16 11:10:28 UTC
[1/2] storm git commit: STORM-2631: Apply new code style to
storm-sql-mongodb
Repository: storm
Updated Branches:
refs/heads/master c22f9972b -> 15cedbe7f
STORM-2631: Apply new code style to storm-sql-mongodb
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/38c91267
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/38c91267
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/38c91267
Branch: refs/heads/master
Commit: 38c91267278ecf40e14c5f2da9ddc53624c08c69
Parents: d7c7818
Author: Xin Wang <be...@163.com>
Authored: Sat Jul 15 11:43:22 2017 +0800
Committer: Xin Wang <be...@163.com>
Committed: Sat Jul 15 11:43:22 2017 +0800
----------------------------------------------------------------------
.../storm-sql-mongodb/pom.xml | 5 +-
.../sql/mongodb/MongoDataSourcesProvider.java | 136 ++++++++++---------
2 files changed, 70 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/38c91267/sql/storm-sql-external/storm-sql-mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-mongodb/pom.xml b/sql/storm-sql-external/storm-sql-mongodb/pom.xml
index 87dc564..82a7036 100644
--- a/sql/storm-sql-external/storm-sql-mongodb/pom.xml
+++ b/sql/storm-sql-external/storm-sql-mongodb/pom.xml
@@ -31,7 +31,7 @@
<developer>
<id>vesense</id>
<name>Xin Wang</name>
- <email>data.xinwang@gmail.com</email>
+ <email>xinwang@apache.org</email>
</developer>
</developers>
@@ -85,9 +85,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
- <configuration>
- <maxAllowedViolations>54</maxAllowedViolations>
- </configuration>
</plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/storm/blob/38c91267/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
index 60d52d1..f3682b8 100644
--- a/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
+++ b/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
@@ -15,9 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.sql.mongodb;
import com.google.common.base.Preconditions;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+
import org.apache.storm.mongodb.common.mapper.MongoMapper;
import org.apache.storm.mongodb.trident.state.MongoState;
import org.apache.storm.mongodb.trident.state.MongoStateFactory;
@@ -36,10 +42,6 @@ import org.apache.storm.trident.state.StateUpdater;
import org.apache.storm.tuple.ITuple;
import org.bson.Document;
-import java.net.URI;
-import java.util.List;
-import java.util.Properties;
-
/**
* Create a MongoDB sink based on the URI and properties. The URI has the format of
* mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]].
@@ -47,80 +49,80 @@ import java.util.Properties;
*/
public class MongoDataSourcesProvider implements DataSourcesProvider {
- private static class MongoTridentDataSource implements ISqlTridentDataSource {
- private final String url;
- private final Properties props;
- private final IOutputSerializer serializer;
-
- private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) {
- this.url = url;
- this.props = props;
- this.serializer = serializer;
+ private static class MongoTridentDataSource implements ISqlTridentDataSource {
+ private final String url;
+ private final Properties props;
+ private final IOutputSerializer serializer;
+
+ private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) {
+ this.url = url;
+ this.props = props;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public ITridentDataSource getProducer() {
+ throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
+ }
+
+ @Override
+ public SqlTridentConsumer getConsumer() {
+ Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config");
+ String serField = props.getProperty("trident.ser.field", "tridentSerField");
+ MongoMapper mapper = new TridentMongoMapper(serField, serializer);
+
+ MongoState.Options options = new MongoState.Options()
+ .withUrl(url)
+ .withCollectionName(props.getProperty("collection.name"))
+ .withMapper(mapper);
+
+ StateFactory stateFactory = new MongoStateFactory(options);
+ StateUpdater stateUpdater = new MongoStateUpdater();
+
+ return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
+ }
}
- @Override
- public ITridentDataSource getProducer() {
- throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
+ private static class TridentMongoMapper implements MongoMapper {
+ private final String serField;
+ private final IOutputSerializer serializer;
+
+ private TridentMongoMapper(String serField, IOutputSerializer serializer) {
+ this.serField = serField;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public Document toDocument(ITuple tuple) {
+ Document document = new Document();
+ byte[] array = serializer.write(tuple.getValues(), null).array();
+ document.append(serField, array);
+ return document;
+ }
+
+ @Override
+ public Document toDocumentByKeys(List<Object> keys) {
+ return null;
+ }
}
@Override
- public SqlTridentConsumer getConsumer() {
- Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config");
- String serField = props.getProperty("trident.ser.field", "tridentSerField");
- MongoMapper mapper = new TridentMongoMapper(serField, serializer);
-
- MongoState.Options options = new MongoState.Options()
- .withUrl(url)
- .withCollectionName(props.getProperty("collection.name"))
- .withMapper(mapper);
-
- StateFactory stateFactory = new MongoStateFactory(options);
- StateUpdater stateUpdater = new MongoStateUpdater();
-
- return new SimpleSqlTridentConsumer(stateFactory, stateUpdater);
- }
- }
-
- private static class TridentMongoMapper implements MongoMapper {
- private final String serField;
- private final IOutputSerializer serializer;
-
- private TridentMongoMapper(String serField, IOutputSerializer serializer) {
- this.serField = serField;
- this.serializer = serializer;
+ public String scheme() {
+ return "mongodb";
}
@Override
- public Document toDocument(ITuple tuple) {
- Document document = new Document();
- byte[] array = serializer.write(tuple.getValues(), null).array();
- document.append(serField, array);
- return document;
+ public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
+ List<FieldInfo> fields) {
+ throw new UnsupportedOperationException();
}
@Override
- public Document toDocumentByKeys(List<Object> keys) {
- return null;
+ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+ Properties properties, List<FieldInfo> fields) {
+ List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
+ IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
+ return new MongoTridentDataSource(uri.toString(), properties, serializer);
}
- }
-
- @Override
- public String scheme() {
- return "mongodb";
- }
-
- @Override
- public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
- List<FieldInfo> fields) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
- Properties properties, List<FieldInfo> fields) {
- List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
- IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
- return new MongoTridentDataSource(uri.toString(), properties, serializer);
- }
}
[2/2] storm git commit: Merge branch 'STORM-2631' of
https://github.com/vesense/storm
Posted by xi...@apache.org.
Merge branch 'STORM-2631' of https://github.com/vesense/storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/15cedbe7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/15cedbe7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/15cedbe7
Branch: refs/heads/master
Commit: 15cedbe7f899aaa49fbf3f4294c252ff74e1f519
Parents: c22f997 38c9126
Author: Xin Wang <be...@163.com>
Authored: Sun Jul 16 19:10:18 2017 +0800
Committer: Xin Wang <be...@163.com>
Committed: Sun Jul 16 19:10:18 2017 +0800
----------------------------------------------------------------------
.../storm-sql-mongodb/pom.xml | 5 +-
.../sql/mongodb/MongoDataSourcesProvider.java | 136 ++++++++++---------
2 files changed, 70 insertions(+), 71 deletions(-)
----------------------------------------------------------------------