You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@streams.apache.org by steveblackmon <gi...@git.apache.org> on 2014/06/26 00:25:29 UTC

[GitHub] incubator-streams pull request: Streams 47

GitHub user steveblackmon opened a pull request:

    https://github.com/apache/incubator-streams/pull/43

    Streams 47

    Working mongo reader/writer

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apache/incubator-streams STREAMS-47

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-streams/pull/43.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #43
    
----
commit 74a38642e6eda05ee6603a66a1e0ba7ea35815d0
Author: sblackmon <sb...@w2odigital.com>
Date:   2014-06-25T21:14:34Z

    Created MongoReader.  Tested as pertual stream

commit cae94d3f713c02a9d8689ec064d99328d4d60bf4
Author: sblackmon <sb...@w2odigital.com>
Date:   2014-06-25T21:47:29Z

    STREAMS-47

commit e9ed506ccd446e17a14a541610ad12ad7f2f44a8
Author: sblackmon <sb...@w2odigital.com>
Date:   2014-06-25T21:52:28Z

    added README

commit 1f1c1db8fae16bff28021553d4f64dd16c550407
Author: sblackmon <sb...@w2odigital.com>
Date:   2014-06-25T22:00:37Z

    added README

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 47

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/43#discussion_r14251038
  
    --- Diff: streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.streams.mongo;
    +
    +import com.google.common.base.Strings;
    +import com.mongodb.DBObject;
    +import org.apache.streams.core.DatumStatus;
    +import org.apache.streams.core.StreamsDatum;
    +import org.joda.time.DateTime;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +
    +public class MongoPersistReaderTask implements Runnable {
    --- End diff --
    
    Why not make this a sub class in the persist reader?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 47

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/43#discussion_r14251050
  
    --- Diff: streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.streams.mongo;
    +
    +import com.google.common.base.Strings;
    +import com.mongodb.DBObject;
    +import org.apache.streams.core.DatumStatus;
    +import org.apache.streams.core.StreamsDatum;
    +import org.joda.time.DateTime;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +
    +public class MongoPersistReaderTask implements Runnable {
    +
    +    private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistReaderTask.class);
    +
    +    private MongoPersistReader reader;
    +
    +    public MongoPersistReaderTask(MongoPersistReader reader) {
    +        this.reader = reader;
    +    }
    +
    +    @Override
    +    public void run() {
    +
    +        try {
    +            while(reader.cursor.hasNext()) {
    +                DBObject dbObject = reader.cursor.next();
    +                StreamsDatum datum = reader.prepareDatum(dbObject);
    +                write(datum);
    +            }
    +        } finally {
    +            reader.cursor.close();
    +        }
    +
    +    }
    +
    +    //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
    +    //as it is a synchronized queue.  What we do care about is that we don't want to be offering to the current reference
    +    //if the queue is being replaced with a new instance
    +    protected void write(StreamsDatum entry) {
    --- End diff --
    
    I don't see why this is in this class.  It should be in the persist reader class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 47

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/43#discussion_r14455856
  
    --- Diff: streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.streams.mongo;
    +
    +import com.google.common.base.Strings;
    +import com.mongodb.DBObject;
    +import org.apache.streams.core.DatumStatus;
    +import org.apache.streams.core.StreamsDatum;
    +import org.joda.time.DateTime;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +
    +public class MongoPersistReaderTask implements Runnable {
    --- End diff --
    
    I don't really have a strong opinion as to whether or not it is an inner class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 47

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/43#discussion_r14271228
  
    --- Diff: streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.streams.mongo;
    +
    +import com.google.common.base.Strings;
    +import com.mongodb.DBObject;
    +import org.apache.streams.core.DatumStatus;
    +import org.apache.streams.core.StreamsDatum;
    +import org.joda.time.DateTime;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +
    +public class MongoPersistReaderTask implements Runnable {
    --- End diff --
    
    never been a fan of inner classes.  but i don't feel strongly about what file the logic sits in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 47

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/43#discussion_r14490248
  
    --- Diff: streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.streams.mongo;
    +
    +import com.google.common.base.Strings;
    +import com.mongodb.DBObject;
    +import org.apache.streams.core.DatumStatus;
    +import org.apache.streams.core.StreamsDatum;
    +import org.joda.time.DateTime;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +
    +public class MongoPersistReaderTask implements Runnable {
    --- End diff --
    
    Moved Task to inner class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 47

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-streams/pull/43


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 47

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/43#discussion_r14490256
  
    --- Diff: streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.streams.mongo;
    +
    +import com.google.common.base.Strings;
    +import com.mongodb.DBObject;
    +import org.apache.streams.core.DatumStatus;
    +import org.apache.streams.core.StreamsDatum;
    +import org.joda.time.DateTime;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +
    +public class MongoPersistReaderTask implements Runnable {
    +
    +    private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistReaderTask.class);
    +
    +    private MongoPersistReader reader;
    +
    +    public MongoPersistReaderTask(MongoPersistReader reader) {
    +        this.reader = reader;
    +    }
    +
    +    @Override
    +    public void run() {
    +
    +        try {
    +            while(reader.cursor.hasNext()) {
    +                DBObject dbObject = reader.cursor.next();
    +                StreamsDatum datum = reader.prepareDatum(dbObject);
    +                write(datum);
    +            }
    +        } finally {
    +            reader.cursor.close();
    +        }
    +
    +    }
    +
    +    //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
    +    //as it is a synchronized queue.  What we do care about is that we don't want to be offering to the current reference
    +    //if the queue is being replaced with a new instance
    +    protected void write(StreamsDatum entry) {
    --- End diff --
    
    Removed write method from Task as suggested


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 47

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/43#discussion_r14455886
  
    --- Diff: streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.streams.mongo;
    +
    +import com.google.common.base.Strings;
    +import com.mongodb.DBObject;
    +import org.apache.streams.core.DatumStatus;
    +import org.apache.streams.core.StreamsDatum;
    +import org.joda.time.DateTime;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +
    +public class MongoPersistReaderTask implements Runnable {
    +
    +    private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistReaderTask.class);
    +
    +    private MongoPersistReader reader;
    +
    +    public MongoPersistReaderTask(MongoPersistReader reader) {
    +        this.reader = reader;
    +    }
    +
    +    @Override
    +    public void run() {
    +
    +        try {
    +            while(reader.cursor.hasNext()) {
    +                DBObject dbObject = reader.cursor.next();
    +                StreamsDatum datum = reader.prepareDatum(dbObject);
    +                write(datum);
    +            }
    +        } finally {
    +            reader.cursor.close();
    +        }
    +
    +    }
    +
    +    //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
    +    //as it is a synchronized queue.  What we do care about is that we don't want to be offering to the current reference
    +    //if the queue is being replaced with a new instance
    +    protected void write(StreamsDatum entry) {
    --- End diff --
    
    IMO, a class should not break encapsulation in this manner.  It would be better to move this write method to the persist reader class, then your task class can call reader.write(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---