You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by ptgoetz <gi...@git.apache.org> on 2014/05/29 21:53:54 UTC

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

GitHub user ptgoetz opened a pull request:

    https://github.com/apache/incubator-storm/pull/128

    STORM-211: Add module for HDFS integration

    More information on usage, etc. is available in the README.md.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ptgoetz/incubator-storm storm-hdfs

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-storm/pull/128.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #128
    
----
commit 18f5e6b216f1386a7e1c4e642944343193e9a022
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-01-27T15:22:26Z

    Initial commit

commit 54d15543df9121e8fe6682047cf69d6fd4b11198
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-01-27T15:26:47Z

    initial check-in

commit 34311bceac43b026d8d4a91644de1d7d733b5013
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-01-27T15:28:37Z

    Merge branch 'master' of github.com:ptgoetz/storm-hdfs

commit 736cd54a6041761eee3428bf6e8e498272b53aad
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-01-27T15:30:29Z

    update .gitignore

commit 8fb04abcd22d757fd7c07106bf99de45b19ef92f
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-01-27T15:47:23Z

    add basic usage example

commit 8c62c8f1aa9489c23d86be29172c65d0c0f1e98d
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-01-27T16:50:47Z

    update readme with basic usage information

commit 07733db07c2a2e549c6114ebc761e772dbbcdeae
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-01-27T16:53:39Z

    fix typo and re-order

commit 42d43f095c285d48effdef82599478df9fa69f60
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-01-28T19:48:26Z

    preliminary support for HDFS sequence files

commit 4ad401ad5bb466a4b6e22e3f1b956149e11d800d
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-01-28T19:50:10Z

    set version

commit fbda8b7000a88b606400c9ed93f9ba68b7b931a8
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-03T20:00:42Z

    use storm incubating version

commit 08bd4140c47b02d33d0f3741561f6a44c620a414
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-03T20:05:49Z

    refactor HdfsBolt/SequenceFileBolt

commit 7444918651d6a2c7595b03951d11f22671eca3b2
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-03T20:08:11Z

    add trident state implementation

commit ebc630286037ddc7c6764cbcc3609ab9c3307b93
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-03T20:25:56Z

    add trident sample to README

commit c5ba15fee214bd9ca0dbe9a1661aec7855fb2db1
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-05T01:35:45Z

    updated readme per issue #1

commit e7adf3f5706602ce481263478b73ba0160fdbd5f
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-05T06:15:04Z

    add rotation actions for bolts

commit 7085e2b19925bc95e7fe921e3979bb16a8c55d1a
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-07T19:31:09Z

    add RotationActions to State implementation.

commit e7a0bb86a76a73eadd047f24c4b60e488a4b1886
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-07T21:05:23Z

    refactor trident state implementation and add support for sequence files.

commit b19b7869cc7363658dd084b5d08e390cac625542
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-07T21:08:56Z

    rollback pom to storm 0.9.0.1

commit 2a0cce2e54eedb4f6f813c4f88b53e852909e1d1
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-08T07:02:55Z

    cleanup and documentation

commit 32826f0c3b908e2852b836f46c303a82b62d73bd
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-08T07:09:06Z

    documentation fix

commit b214f7231d03ff3c964afd88616e11de16b91a18
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-10T15:46:15Z

    cleanup and javadoc

commit a826fa33d18d41d6a3c9869c30c4e54f5a848e0a
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-10T15:46:45Z

    add samples to test directory

commit 98a70814025bff0a65c01d17c1a2550f17b7b38f
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-11T21:10:17Z

    move path configuration to FileNameFormat to allow dynamic paths

commit bd3d7805be7d8c57d39626b61e1e50240fa98824
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-11T21:12:55Z

    bump version for API change

commit 891f2f8efe22cad232f10fa6f7a1e327623159dc
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-02-11T21:17:26Z

    update README for API change

commit 0c754dc6d47c6b5b76bb06cace190b164350e53c
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-03-11T13:33:45Z

    update examples to get hdfs url from CLI arguments

commit 7a75c959eea518211af766ee0f1146dbc051ae0e
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-03-11T21:11:17Z

    add ability to specify hdfs url from command line in samples

commit f550621ea814af02014886a78b45584569b468fe
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-03-21T19:36:33Z

    fix issue #7

commit c8a511bd0d753bbb48a80a815998e8f7000c77a4
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-04-09T20:03:52Z

    fix issue with DelimitedRecordFormat

commit 15346a8533442dbd82806824556eb5fd2735e38e
Author: P. Taylor Goetz <pt...@gmail.com>
Date:   2014-04-14T17:51:20Z

    add ability to specify hadoop configuration parameters (override hdfs-site.xml)

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/incubator-storm/pull/128#issuecomment-50528539
  
    @revans2 Addressed a number of your comments and added you as a Committer Sponsor


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13207702
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java ---
    @@ -0,0 +1,129 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.bolt;
    +
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
    +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
    +import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    +import org.apache.storm.hdfs.bolt.format.RecordFormat;
    +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    +import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.EnumSet;
    +import java.util.Map;
    +
    +public class HdfsBolt extends AbstractHdfsBolt{
    +    private static final Logger LOG = LoggerFactory.getLogger(HdfsBolt.class);
    +
    +    private FSDataOutputStream out;
    +    private RecordFormat format;
    +    private long offset = 0;
    +
    +    public HdfsBolt withFsUrl(String fsUrl){
    +        this.fsUrl = fsUrl;
    +        return this;
    +    }
    +
    +    public HdfsBolt withConfigKey(String configKey){
    +        this.configKey = configKey;
    +        return this;
    +    }
    +
    +    public HdfsBolt withFileNameFormat(FileNameFormat fileNameFormat){
    +        this.fileNameFormat = fileNameFormat;
    +        return this;
    +    }
    +
    +    public HdfsBolt withRecordFormat(RecordFormat format){
    +        this.format = format;
    +        return this;
    +    }
    +
    +    public HdfsBolt withSyncPolicy(SyncPolicy syncPolicy){
    +        this.syncPolicy = syncPolicy;
    +        return this;
    +    }
    +
    +    public HdfsBolt withRotationPolicy(FileRotationPolicy rotationPolicy){
    +        this.rotationPolicy = rotationPolicy;
    +        return this;
    +    }
    +
    +    public HdfsBolt addRotationAction(RotationAction action){
    +        this.rotationActions.add(action);
    +        return this;
    +    }
    +
    +    @Override
    +    public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
    +        LOG.info("Preparing HDFS Bolt...");
    +        this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            byte[] bytes = this.format.format(tuple);
    +            out.write(bytes);
    +            this.offset += bytes.length;
    +
    +            this.collector.ack(tuple);
    --- End diff --
    
    Is there ever a use case to only ack the tuple after it has hit disk?  I guess that kind of is what the trident implementations are for.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r14109626
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java ---
    @@ -0,0 +1,48 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.common.security;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.SecurityUtil;
    +import org.apache.hadoop.security.UserGroupInformation;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +
    +/**
    + * This class provides util methods for storm-hdfs connector communicating
    + * with secured HDFS.
    + */
    +public class HdfsSecurityUtil {
    +    public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";
    +    public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
    --- End diff --
    
    Sounds like a good idea to me as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/incubator-storm/pull/128#issuecomment-46924306
  
    Thanks for the review @revans2.  I'll update accordingly.
    
    A lot of our users have found it useful as well, 
    
    I'll add you to the sponsor list as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13385186
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java ---
    @@ -0,0 +1,48 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.common.security;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.SecurityUtil;
    +import org.apache.hadoop.security.UserGroupInformation;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +
    +/**
    + * This class provides util methods for storm-hdfs connector communicating
    + * with secured HDFS.
    + */
    +public class HdfsSecurityUtil {
    +    public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";
    +    public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
    +
    +    public static void login(Map conf, Configuration hdfsConfig) throws IOException {
    --- End diff --
    
    It would be great as part of this function to check before hand if the user is already logged in or not.  On the security branch we added in AutoTGT that if Hadoop is available on the class path it will automatically log you into Hadoop using the TGT.  My concern here is that code running on multi-tenant storm and single-tenant storm will need to be different from one another.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13385606
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java ---
    @@ -0,0 +1,293 @@
    +package org.apache.storm.hdfs.trident;
    +
    +import backtype.storm.task.IMetricsContext;
    +import backtype.storm.topology.FailedException;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
    +import org.apache.hadoop.io.SequenceFile;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
    +import org.apache.storm.hdfs.trident.format.FileNameFormat;
    +import org.apache.storm.hdfs.trident.format.RecordFormat;
    +import org.apache.storm.hdfs.trident.format.SequenceFormat;
    +import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import storm.trident.operation.TridentCollector;
    +import storm.trident.state.State;
    +import storm.trident.tuple.TridentTuple;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +public class HdfsState implements State {
    +
    +    public static abstract class Options implements Serializable {
    +
    +        protected String fsUrl;
    +        protected String configKey;
    +        protected FileSystem fs;
    --- End diff --
    
    Can we explicitly mark this as transient.  I don't believe any FileSystem instances are serializable, and transient just makes it clearer that this can only be set after we are deserialized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/128#issuecomment-45091287
  
    I finally made it through all of the code.  It looks good for the most part.  Just a few minor comments.  I also am wiling to maintain/support this code. I have a umber of customers who I know would be very interested in using this, so I would be on the hook for supporting it anyways :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r15555399
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java ---
    @@ -0,0 +1,55 @@
    +package org.apache.storm.hdfs.bolt.rotation;
    +
    +import backtype.storm.tuple.Tuple;
    +
    +public class TimedRotationPolicy implements FileRotationPolicy {
    +
    +    public static enum TimeUnit {
    --- End diff --
    
    Nothing more than personal preference. I just didn't like j.u.c.TimeUnit API in this context. I thought it was more intuitive to allow users to specify durations in fractions (e.g. 1.5 hours).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/128#issuecomment-50542598
  
    The changes look good to me and I am +1 for merging this in.  I do have one minor comment about using a custom TimeUnit instead of the standard one, but it is very minor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13204413
  
    --- Diff: external/storm-hdfs/README.md ---
    @@ -0,0 +1,342 @@
    +# Storm HDFS
    +
    +Storm components for interacting with HDFS file systems
    +
    +
    +## Usage
    +The following example will write pipe("|")-delimited files to the HDFS path hdfs://localhost:54310/foo. After every
    +1,000 tuples it will sync filesystem, making that data visible to other HDFS clients. It will rotate files when they
    +reach 5 megabytes in size.
    +
    +```java
    +// use "|" instead of "," for field delimiter
    +RecordFormat format = new DelimitedRecordFormat()
    +        .withFieldDelimiter("|");
    +
    +// sync the filesystem after every 1k tuples
    +SyncPolicy syncPolicy = new CountSyncPolicy(1000);
    +
    +// rotate files when they reach 5MB
    +FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
    +
    +FileNameFormat fileNameFormat = new DefaultFileNameFormat()
    +        .withPath("/foo/");
    +
    +HdfsBolt bolt = new HdfsBolt()
    +        .withFsUrl("hdfs://localhost:54310")
    +        .withFileNameFormat(fileNameFormat)
    +        .withRecordFormat(format)
    +        .withRotationPolicy(rotationPolicy)
    +        .withSyncPolicy(syncPolicy);
    +```
    +
    +### Packaging a Topology
    +When packaging your topology, it's important that you use the [maven-shade-plugin]() as opposed to the
    +[maven-assempbly-plugin]().
    --- End diff --
    
    s/assempbly/assembly/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13205686
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---
    @@ -0,0 +1,110 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.bolt;
    +
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    +import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +public abstract class AbstractHdfsBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class);
    +
    +    protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
    +    private Path currentFile;
    +    protected OutputCollector collector;
    +    protected FileSystem fs;
    +    protected SyncPolicy syncPolicy;
    +    protected FileRotationPolicy rotationPolicy;
    +    protected FileNameFormat fileNameFormat;
    +    protected int rotation = 0;
    +    protected String fsUrl;
    +    protected String configKey;
    +//    protected String path;
    --- End diff --
    
    yep.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r14109548
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---
    @@ -0,0 +1,110 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.bolt;
    +
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    +import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +public abstract class AbstractHdfsBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class);
    +
    +    protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
    --- End diff --
    
    I thought about making 'rotationAction' a single instance as opposed to a list, and providing a MultiRotationAction implementation that essentially does the same thing.
    
    That seemed like overkill to me so I kept it simple.
    
    The idea behind allowing more than one is basically "who knows what people will do with this, so let's give them freedom, and hope the don't shoot themselves in the foot."
    
    Right now, rotation actions run in-line with tuple processing. So they need to be kept light/fast depending on the settings.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r14109654
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java ---
    @@ -0,0 +1,293 @@
    +package org.apache.storm.hdfs.trident;
    +
    +import backtype.storm.task.IMetricsContext;
    +import backtype.storm.topology.FailedException;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
    +import org.apache.hadoop.io.SequenceFile;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
    +import org.apache.storm.hdfs.trident.format.FileNameFormat;
    +import org.apache.storm.hdfs.trident.format.RecordFormat;
    +import org.apache.storm.hdfs.trident.format.SequenceFormat;
    +import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import storm.trident.operation.TridentCollector;
    +import storm.trident.state.State;
    +import storm.trident.tuple.TridentTuple;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +public class HdfsState implements State {
    +
    +    public static abstract class Options implements Serializable {
    +
    +        protected String fsUrl;
    +        protected String configKey;
    +        protected FileSystem fs;
    --- End diff --
    
    +1 No problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/incubator-storm/pull/128#issuecomment-47304373
  
    Just to make anyone interested aware, this pull request is a git subtree merge of the master branch of this repository:
    
    https://github.com/ptgoetz/storm-hdfs
    
    There's additional activity there specifically related to the security branch.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r14109580
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java ---
    @@ -0,0 +1,146 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.bolt;
    +
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.SequenceFile;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    +import org.apache.storm.hdfs.bolt.format.SequenceFormat;
    +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    +import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.Map;
    +
    +public class SequenceFileBolt extends AbstractHdfsBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(SequenceFileBolt.class);
    +
    +    private SequenceFormat format;
    +    private SequenceFile.CompressionType compressionType = SequenceFile.CompressionType.RECORD;
    +    private SequenceFile.Writer writer;
    +
    +    private String compressionCodec = "default";
    +    private transient CompressionCodecFactory codecFactory;
    +
    +    public SequenceFileBolt() {
    +    }
    +
    +    public SequenceFileBolt withCompressionCodec(String codec){
    +        this.compressionCodec = codec;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withFsUrl(String fsUrl) {
    +        this.fsUrl = fsUrl;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withConfigKey(String configKey){
    +        this.configKey = configKey;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withFileNameFormat(FileNameFormat fileNameFormat) {
    +        this.fileNameFormat = fileNameFormat;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withSequenceFormat(SequenceFormat format) {
    +        this.format = format;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withSyncPolicy(SyncPolicy syncPolicy) {
    +        this.syncPolicy = syncPolicy;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withRotationPolicy(FileRotationPolicy rotationPolicy) {
    +        this.rotationPolicy = rotationPolicy;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withCompressionType(SequenceFile.CompressionType compressionType){
    +        this.compressionType = compressionType;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt addRotationAction(RotationAction action){
    +        this.rotationActions.add(action);
    +        return this;
    +    }
    +
    +    @Override
    +    public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
    +        LOG.info("Preparing Sequence File Bolt...");
    +        if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified.");
    +
    +        this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
    +        this.codecFactory = new CompressionCodecFactory(hdfsConfig);
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            this.writer.append(this.format.key(tuple), this.format.value(tuple));
    +            long offset = this.writer.getLength();
    +            this.collector.ack(tuple);
    +
    +            if (this.syncPolicy.mark(tuple, offset)) {
    +                this.writer.hsync();
    --- End diff --
    
    I'm okay with targeting 2.x. I'll update the docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13205400
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---
    @@ -0,0 +1,110 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.bolt;
    +
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    +import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +public abstract class AbstractHdfsBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class);
    +
    +    protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
    +    private Path currentFile;
    +    protected OutputCollector collector;
    +    protected FileSystem fs;
    +    protected SyncPolicy syncPolicy;
    +    protected FileRotationPolicy rotationPolicy;
    +    protected FileNameFormat fileNameFormat;
    +    protected int rotation = 0;
    +    protected String fsUrl;
    +    protected String configKey;
    +//    protected String path;
    --- End diff --
    
    remove?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13385705
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java ---
    @@ -0,0 +1,293 @@
    +package org.apache.storm.hdfs.trident;
    +
    +import backtype.storm.task.IMetricsContext;
    +import backtype.storm.topology.FailedException;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
    +import org.apache.hadoop.io.SequenceFile;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
    +import org.apache.storm.hdfs.trident.format.FileNameFormat;
    +import org.apache.storm.hdfs.trident.format.RecordFormat;
    +import org.apache.storm.hdfs.trident.format.SequenceFormat;
    +import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import storm.trident.operation.TridentCollector;
    +import storm.trident.state.State;
    +import storm.trident.tuple.TridentTuple;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +public class HdfsState implements State {
    +
    +    public static abstract class Options implements Serializable {
    +
    +        protected String fsUrl;
    +        protected String configKey;
    +        protected FileSystem fs;
    +        private Path currentFile;
    +        protected FileRotationPolicy rotationPolicy;
    +        protected FileNameFormat fileNameFormat;
    +        protected int rotation = 0;
    +        protected Configuration hdfsConfig;
    +        protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
    +
    +        abstract void closeOutputFile() throws IOException;
    +
    +        abstract Path createOutputFile() throws IOException;
    +
    +        abstract void execute(List<TridentTuple> tuples) throws IOException;
    +
    +        abstract void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException;
    +
    +        protected void rotateOutputFile() throws IOException {
    +            LOG.info("Rotating output file...");
    +            long start = System.currentTimeMillis();
    +            closeOutputFile();
    +            this.rotation++;
    +
    +            Path newFile = createOutputFile();
    +            LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
    +            for(RotationAction action : this.rotationActions){
    +                action.execute(this.fs, this.currentFile);
    +            }
    +            this.currentFile = newFile;
    +            long time = System.currentTimeMillis() - start;
    +            LOG.info("File rotation took {} ms.", time);
    +
    +
    +        }
    +
    +        void prepare(Map conf, int partitionIndex, int numPartitions){
    +            if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy must be specified.");
    +            if (this.fsUrl == null) {
    +                throw new IllegalStateException("File system URL must be specified.");
    +            }
    +            this.fileNameFormat.prepare(conf, partitionIndex, numPartitions);
    +            this.hdfsConfig = new Configuration();
    +            Map<String, Object> map = (Map<String, Object>)conf.get(this.configKey);
    +            if(map != null){
    +                for(String key : map.keySet()){
    +                    this.hdfsConfig.set(key, String.valueOf(map.get(key)));
    +                }
    +            }
    +            try{
    +                HdfsSecurityUtil.login(conf, hdfsConfig);
    +                doPrepare(conf, partitionIndex, numPartitions);
    +                this.currentFile = createOutputFile();
    +
    +            } catch (Exception e){
    +                throw new RuntimeException("Error preparing HdfsState: " + e.getMessage(), e);
    +            }
    +        }
    +
    +    }
    +
    +    public static class HdfsFileOptions extends Options {
    +
    +        private FSDataOutputStream out;
    --- End diff --
    
    transient


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13207661
  
    --- Diff: external/storm-hdfs/README.md ---
    @@ -0,0 +1,342 @@
    +# Storm HDFS
    +
    --- End diff --
    
    Fair enough, it is just that a lot of the configs are really hard to get right, especially for security, unless you copy the configs from a gateway.
    
    Also I could be wrong about this but in the past for us there is enough about Hadoop and its RPC layer that is static, that you cannot talk to two different clusters that are configured incompatibly.  But a client can talk to many different compatible clusters.  Either way we can adjust the documentation as we get more experience with users trying to do this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13205555
  
    --- Diff: external/storm-hdfs/README.md ---
    @@ -0,0 +1,342 @@
    +# Storm HDFS
    +
    --- End diff --
    
    I will update the documentation, but it's best to avoid using core-site.xml and hdfs-site.xml so you can have multiple bolts in a topology that talk to different HDFS clusters.
    
    You do this by configuring the bolt with a "configKey" that points to an entry in the storm configuration map. Any properties under that key will be added to the hadoop `Configuration` object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13205800
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---
    @@ -0,0 +1,110 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.bolt;
    +
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    +import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +public abstract class AbstractHdfsBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class);
    +
    +    protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
    +    private Path currentFile;
    +    protected OutputCollector collector;
    +    protected FileSystem fs;
    +    protected SyncPolicy syncPolicy;
    +    protected FileRotationPolicy rotationPolicy;
    +    protected FileNameFormat fileNameFormat;
    +    protected int rotation = 0;
    +    protected String fsUrl;
    +    protected String configKey;
    +//    protected String path;
    +
    +    protected Configuration hdfsConfig;
    +
    +    protected void rotateOutputFile() throws IOException {
    +        LOG.info("Rotating output file...");
    +        long start = System.currentTimeMillis();
    +        closeOutputFile();
    +        this.rotation++;
    +
    +        Path newFile = createOutputFile();
    +        LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
    +        for(RotationAction action : this.rotationActions){
    +            action.execute(this.fs, this.currentFile);
    +        }
    +        this.currentFile = newFile;
    +        long time = System.currentTimeMillis() - start;
    +        LOG.info("File rotation took {} ms.", time);
    +    }
    +
    +    public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector){
    --- End diff --
    
    It might be good to mark this as final and add some javadocs so that it is obvious that doPrepare is the extension point and not prepare.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/incubator-storm/pull/128#issuecomment-44587334
  
    I should also add that I listed myself as a committer-sponsor. If there are other committers that are willing to act as sponsors, let me know and I'll add you to the list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r15555532
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java ---
    @@ -0,0 +1,55 @@
    +package org.apache.storm.hdfs.bolt.rotation;
    +
    +import backtype.storm.tuple.Tuple;
    +
    +public class TimedRotationPolicy implements FileRotationPolicy {
    +
    +    public static enum TimeUnit {
    --- End diff --
    
    fair enough. I am still +1 on merging the code in


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/incubator-storm/pull/128#issuecomment-51527392
  
    I'm +1
    
    Any other committers care to review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13385421
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java ---
    @@ -0,0 +1,48 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.common.security;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.SecurityUtil;
    +import org.apache.hadoop.security.UserGroupInformation;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +
    +/**
    + * This class provides util methods for storm-hdfs connector communicating
    + * with secured HDFS.
    + */
    +public class HdfsSecurityUtil {
    +    public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";
    +    public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
    +
    +    public static void login(Map conf, Configuration hdfsConfig) throws IOException {
    +        if (UserGroupInformation.isSecurityEnabled()) {
    --- End diff --
    
    With our previous discussion about shipping configs packaged in the jar vs as a part of the topology conf and then populating a Hadoop Configuration.  This is one of those places where some of the config really has to be on the class path, and cannot be a part of the topology conf.  UserGroupInformation internally is creating a new Configuration that will only search the class path and pull in configs it finds.  It then uses that config to decide if security is enabled.  If you don't have a core-site.xml that is configured with security, it will never think you have security turned on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13204819
  
    --- Diff: external/storm-hdfs/README.md ---
    @@ -0,0 +1,342 @@
    +# Storm HDFS
    +
    --- End diff --
    
    In general you might want to mention something about including Hadoop configurations like core-site.xml and hdfs-site.xml.  Because if the config files do not match the configs on the Hadoop cluster a number of errors can occur.  This is especially true if you try to run with security turned on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-storm/pull/128


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13205891
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---
    @@ -0,0 +1,110 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.bolt;
    +
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    +import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +public abstract class AbstractHdfsBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class);
    +
    +    protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
    +    private Path currentFile;
    +    protected OutputCollector collector;
    +    protected FileSystem fs;
    +    protected SyncPolicy syncPolicy;
    +    protected FileRotationPolicy rotationPolicy;
    +    protected FileNameFormat fileNameFormat;
    +    protected int rotation = 0;
    +    protected String fsUrl;
    +    protected String configKey;
    +//    protected String path;
    +
    +    protected Configuration hdfsConfig;
    +
    +    protected void rotateOutputFile() throws IOException {
    +        LOG.info("Rotating output file...");
    +        long start = System.currentTimeMillis();
    +        closeOutputFile();
    +        this.rotation++;
    +
    +        Path newFile = createOutputFile();
    +        LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
    +        for(RotationAction action : this.rotationActions){
    +            action.execute(this.fs, this.currentFile);
    +        }
    +        this.currentFile = newFile;
    +        long time = System.currentTimeMillis() - start;
    +        LOG.info("File rotation took {} ms.", time);
    +    }
    +
    +    public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector){
    --- End diff --
    
    good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13235820
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java ---
    @@ -0,0 +1,146 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.bolt;
    +
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.SequenceFile;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    +import org.apache.storm.hdfs.bolt.format.SequenceFormat;
    +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    +import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.Map;
    +
    +public class SequenceFileBolt extends AbstractHdfsBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(SequenceFileBolt.class);
    +
    +    private SequenceFormat format;
    +    private SequenceFile.CompressionType compressionType = SequenceFile.CompressionType.RECORD;
    +    private SequenceFile.Writer writer;
    +
    +    private String compressionCodec = "default";
    +    private transient CompressionCodecFactory codecFactory;
    +
    +    public SequenceFileBolt() {
    +    }
    +
    +    public SequenceFileBolt withCompressionCodec(String codec){
    +        this.compressionCodec = codec;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withFsUrl(String fsUrl) {
    +        this.fsUrl = fsUrl;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withConfigKey(String configKey){
    +        this.configKey = configKey;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withFileNameFormat(FileNameFormat fileNameFormat) {
    +        this.fileNameFormat = fileNameFormat;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withSequenceFormat(SequenceFormat format) {
    +        this.format = format;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withSyncPolicy(SyncPolicy syncPolicy) {
    +        this.syncPolicy = syncPolicy;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withRotationPolicy(FileRotationPolicy rotationPolicy) {
    +        this.rotationPolicy = rotationPolicy;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withCompressionType(SequenceFile.CompressionType compressionType){
    +        this.compressionType = compressionType;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt addRotationAction(RotationAction action){
    +        this.rotationActions.add(action);
    +        return this;
    +    }
    +
    +    @Override
    +    public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
    +        LOG.info("Preparing Sequence File Bolt...");
    +        if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified.");
    +
    +        this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
    +        this.codecFactory = new CompressionCodecFactory(hdfsConfig);
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            this.writer.append(this.format.key(tuple), this.format.value(tuple));
    +            long offset = this.writer.getLength();
    +            this.collector.ack(tuple);
    +
    +            if (this.syncPolicy.mark(tuple, offset)) {
    +                this.writer.hsync();
    +                this.syncPolicy.reset();
    +            }
    +            if (this.rotationPolicy.mark(tuple, offset)) {
    +                rotateOutputFile();
    +                this.rotationPolicy.reset();
    +            }
    +        } catch (IOException e) {
    +            LOG.warn("write/sync failed.", e);
    +            this.collector.fail(tuple);
    +        }
    +
    +    }
    +
    +    Path createOutputFile() throws IOException {
    +        Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
    +        this.writer = SequenceFile.createWriter(
    +                this.hdfsConfig,
    +                SequenceFile.Writer.file(p),
    +                SequenceFile.Writer.keyClass(this.format.keyClass()),
    +                SequenceFile.Writer.valueClass(this.format.valueClass()),
    +                SequenceFile.Writer.compression(this.compressionType, this.codecFactory.getCodecByName(this.compressionCodec))
    +        );
    +        return p;
    +    }
    +
    +    void closeOutputFile() throws IOException {
    +        this.writer.hsync();
    --- End diff --
    
    I'm not sure this is needed.  Close is supposed to block until the data is in HDFS, not necessarily on disk. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r15554077
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java ---
    @@ -0,0 +1,55 @@
    +package org.apache.storm.hdfs.bolt.rotation;
    +
    +import backtype.storm.tuple.Tuple;
    +
    +public class TimedRotationPolicy implements FileRotationPolicy {
    +
    +    public static enum TimeUnit {
    --- End diff --
    
    Sorry commenting in the right place now. Why not use java.util.concurrent.TimeUnit instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13385682
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java ---
    @@ -0,0 +1,293 @@
    +package org.apache.storm.hdfs.trident;
    +
    +import backtype.storm.task.IMetricsContext;
    +import backtype.storm.topology.FailedException;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
    +import org.apache.hadoop.io.SequenceFile;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
    +import org.apache.storm.hdfs.trident.format.FileNameFormat;
    +import org.apache.storm.hdfs.trident.format.RecordFormat;
    +import org.apache.storm.hdfs.trident.format.SequenceFormat;
    +import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import storm.trident.operation.TridentCollector;
    +import storm.trident.state.State;
    +import storm.trident.tuple.TridentTuple;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +public class HdfsState implements State {
    +
    +    public static abstract class Options implements Serializable {
    +
    +        protected String fsUrl;
    +        protected String configKey;
    +        protected FileSystem fs;
    +        private Path currentFile;
    +        protected FileRotationPolicy rotationPolicy;
    +        protected FileNameFormat fileNameFormat;
    +        protected int rotation = 0;
    +        protected Configuration hdfsConfig;
    +        protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
    +
    +        abstract void closeOutputFile() throws IOException;
    +
    +        abstract Path createOutputFile() throws IOException;
    +
    +        abstract void execute(List<TridentTuple> tuples) throws IOException;
    +
    +        abstract void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException;
    +
    +        protected void rotateOutputFile() throws IOException {
    +            LOG.info("Rotating output file...");
    +            long start = System.currentTimeMillis();
    +            closeOutputFile();
    +            this.rotation++;
    +
    +            Path newFile = createOutputFile();
    +            LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
    +            for(RotationAction action : this.rotationActions){
    +                action.execute(this.fs, this.currentFile);
    +            }
    +            this.currentFile = newFile;
    +            long time = System.currentTimeMillis() - start;
    +            LOG.info("File rotation took {} ms.", time);
    +
    +
    +        }
    +
    +        void prepare(Map conf, int partitionIndex, int numPartitions){
    +            if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy must be specified.");
    +            if (this.fsUrl == null) {
    +                throw new IllegalStateException("File system URL must be specified.");
    +            }
    +            this.fileNameFormat.prepare(conf, partitionIndex, numPartitions);
    +            this.hdfsConfig = new Configuration();
    +            Map<String, Object> map = (Map<String, Object>)conf.get(this.configKey);
    +            if(map != null){
    +                for(String key : map.keySet()){
    +                    this.hdfsConfig.set(key, String.valueOf(map.get(key)));
    +                }
    +            }
    +            try{
    +                HdfsSecurityUtil.login(conf, hdfsConfig);
    +                doPrepare(conf, partitionIndex, numPartitions);
    +                this.currentFile = createOutputFile();
    +
    +            } catch (Exception e){
    +                throw new RuntimeException("Error preparing HdfsState: " + e.getMessage(), e);
    +            }
    +        }
    +
    +    }
    +
    +    public static class HdfsFileOptions extends Options {
    +
    +        private FSDataOutputStream out;
    +        protected RecordFormat format;
    +        private long offset = 0;
    +
    +        public HdfsFileOptions withFsUrl(String fsUrl){
    +            this.fsUrl = fsUrl;
    +            return this;
    +        }
    +
    +        public HdfsFileOptions withConfigKey(String configKey){
    +            this.configKey = configKey;
    +            return this;
    +        }
    +
    +        public HdfsFileOptions withFileNameFormat(FileNameFormat fileNameFormat){
    +            this.fileNameFormat = fileNameFormat;
    +            return this;
    +        }
    +
    +        public HdfsFileOptions withRecordFormat(RecordFormat format){
    +            this.format = format;
    +            return this;
    +        }
    +
    +        public HdfsFileOptions withRotationPolicy(FileRotationPolicy rotationPolicy){
    +            this.rotationPolicy = rotationPolicy;
    +            return this;
    +        }
    +
    +        public HdfsFileOptions addRotationAction(RotationAction action){
    +            this.rotationActions.add(action);
    +            return this;
    +        }
    +
    +        @Override
    +        void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
    +            LOG.info("Preparing HDFS Bolt...");
    +            this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
    +        }
    +
    +        @Override
    +        void closeOutputFile() throws IOException {
    +            this.out.close();
    +        }
    +
    +        @Override
    +        Path createOutputFile() throws IOException {
    +            Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
    +            this.out = this.fs.create(path);
    +            return path;
    +        }
    +
    +        @Override
    +        public void execute(List<TridentTuple> tuples) throws IOException {
    +            boolean rotated = false;
    +            for(TridentTuple tuple : tuples){
    +                byte[] bytes = this.format.format(tuple);
    +                out.write(bytes);
    +                this.offset += bytes.length;
    +
    +                if(this.rotationPolicy.mark(tuple, this.offset)){
    +                    rotateOutputFile();
    +                    this.offset = 0;
    +                    this.rotationPolicy.reset();
    +                    rotated = true;
    +                }
    +            }
    +            if(!rotated){
    +                if(this.out instanceof HdfsDataOutputStream){
    +                    ((HdfsDataOutputStream)this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
    +                } else {
    +                    this.out.hsync();
    +                }
    +            }
    +        }
    +    }
    +
    +    public static class SequenceFileOptions extends Options {
    +        private SequenceFormat format;
    +        private SequenceFile.CompressionType compressionType = SequenceFile.CompressionType.RECORD;
    +        private SequenceFile.Writer writer;
    --- End diff --
    
    transient


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13205662
  
    --- Diff: external/storm-hdfs/pom.xml ---
    @@ -0,0 +1,68 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    + Licensed to the Apache Software Foundation (ASF) under one or more
    + contributor license agreements.  See the NOTICE file distributed with
    + this work for additional information regarding copyright ownership.
    + The ASF licenses this file to You under the Apache License, Version 2.0
    + (the "License"); you may not use this file except in compliance with
    + the License.  You may obtain a copy of the License at
    +
    +     http://www.apache.org/licenses/LICENSE-2.0
    +
    + Unless required by applicable law or agreed to in writing, software
    + distributed under the License is distributed on an "AS IS" BASIS,
    + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + See the License for the specific language governing permissions and
    + limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <artifactId>storm</artifactId>
    +        <groupId>org.apache.storm</groupId>
    +        <version>0.9.2-incubating-SNAPSHOT</version>
    +        <relativePath>../../pom.xml</relativePath>
    +    </parent>
    +
    +    <artifactId>storm-hdfs</artifactId>
    +
    +    <developers>
    +        <developer>
    +            <id>ptgoetz</id>
    +            <name>P. Taylor Goetz</name>
    +            <email>ptgoetz@gmail.com</email>
    +        </developer>
    +    </developers>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.storm</groupId>
    +            <artifactId>storm-core</artifactId>
    +            <version>0.9.1-incubating</version>
    --- End diff --
    
    good catch. Actually it should probably be `${project.version}`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13205332
  
    --- Diff: external/storm-hdfs/pom.xml ---
    @@ -0,0 +1,68 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    + Licensed to the Apache Software Foundation (ASF) under one or more
    + contributor license agreements.  See the NOTICE file distributed with
    + this work for additional information regarding copyright ownership.
    + The ASF licenses this file to You under the Apache License, Version 2.0
    + (the "License"); you may not use this file except in compliance with
    + the License.  You may obtain a copy of the License at
    +
    +     http://www.apache.org/licenses/LICENSE-2.0
    +
    + Unless required by applicable law or agreed to in writing, software
    + distributed under the License is distributed on an "AS IS" BASIS,
    + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + See the License for the specific language governing permissions and
    + limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <artifactId>storm</artifactId>
    +        <groupId>org.apache.storm</groupId>
    +        <version>0.9.2-incubating-SNAPSHOT</version>
    +        <relativePath>../../pom.xml</relativePath>
    +    </parent>
    +
    +    <artifactId>storm-hdfs</artifactId>
    +
    +    <developers>
    +        <developer>
    +            <id>ptgoetz</id>
    +            <name>P. Taylor Goetz</name>
    +            <email>ptgoetz@gmail.com</email>
    +        </developer>
    +    </developers>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.storm</groupId>
    +            <artifactId>storm-core</artifactId>
    +            <version>0.9.1-incubating</version>
    --- End diff --
    
    s/0.9.1/0.9.2/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13385659
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java ---
    @@ -0,0 +1,293 @@
    +package org.apache.storm.hdfs.trident;
    +
    +import backtype.storm.task.IMetricsContext;
    +import backtype.storm.topology.FailedException;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
    +import org.apache.hadoop.io.SequenceFile;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
    +import org.apache.storm.hdfs.trident.format.FileNameFormat;
    +import org.apache.storm.hdfs.trident.format.RecordFormat;
    +import org.apache.storm.hdfs.trident.format.SequenceFormat;
    +import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import storm.trident.operation.TridentCollector;
    +import storm.trident.state.State;
    +import storm.trident.tuple.TridentTuple;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +public class HdfsState implements State {
    +
    +    public static abstract class Options implements Serializable {
    +
    +        protected String fsUrl;
    +        protected String configKey;
    +        protected FileSystem fs;
    +        private Path currentFile;
    +        protected FileRotationPolicy rotationPolicy;
    +        protected FileNameFormat fileNameFormat;
    +        protected int rotation = 0;
    +        protected Configuration hdfsConfig;
    --- End diff --
    
    transient too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13236143
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---
    @@ -0,0 +1,110 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.bolt;
    +
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    +import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +public abstract class AbstractHdfsBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class);
    +
    +    protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
    --- End diff --
    
    If the intention of a RotationAction is primarily to move/rename a file, then I am not really sure that having multiple of them makes a lot of since.  I can see having other actions, like sending an event that rotation has finished, but I personally would rather represent it as a RotationAction that wraps another one.  Just so it can have more control over the order in which operations, like a move, happen.
    
    This is not critical, just curious for your thoughts on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13205187
  
    --- Diff: external/storm-hdfs/README.md ---
    @@ -0,0 +1,342 @@
    +# Storm HDFS
    +
    +Storm components for interacting with HDFS file systems
    +
    +
    +## Usage
    +The following example will write pipe("|")-delimited files to the HDFS path hdfs://localhost:54310/foo. After every
    +1,000 tuples it will sync filesystem, making that data visible to other HDFS clients. It will rotate files when they
    +reach 5 megabytes in size.
    +
    +```java
    +// use "|" instead of "," for field delimiter
    +RecordFormat format = new DelimitedRecordFormat()
    +        .withFieldDelimiter("|");
    +
    +// sync the filesystem after every 1k tuples
    +SyncPolicy syncPolicy = new CountSyncPolicy(1000);
    +
    +// rotate files when they reach 5MB
    +FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
    +
    +FileNameFormat fileNameFormat = new DefaultFileNameFormat()
    +        .withPath("/foo/");
    +
    +HdfsBolt bolt = new HdfsBolt()
    +        .withFsUrl("hdfs://localhost:54310")
    +        .withFileNameFormat(fileNameFormat)
    +        .withRecordFormat(format)
    +        .withRotationPolicy(rotationPolicy)
    +        .withSyncPolicy(syncPolicy);
    +```
    +
    +### Packaging a Topology
    +When packaging your topology, it's important that you use the [maven-shade-plugin]() as opposed to the
    +[maven-assempbly-plugin]().
    +
    +The shade plugin provides facilities for merging JAR manifest entries, which the hadoop client leverages for URL scheme
    +resolution.
    +
    +If you experience errors such as the following:
    +
    +```
    +java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs
    +```
    +
    +it's an indication that your topology jar file isn't packaged properly.
    +
    +If you are using maven to create your topology jar, you should use the following `maven-shade-plugin` configuration to
    +create your topology jar:
    +
    +```xml
    +<plugin>
    +    <groupId>org.apache.maven.plugins</groupId>
    +    <artifactId>maven-shade-plugin</artifactId>
    +    <version>1.4</version>
    +    <configuration>
    +        <createDependencyReducedPom>true</createDependencyReducedPom>
    +    </configuration>
    +    <executions>
    +        <execution>
    +            <phase>package</phase>
    +            <goals>
    +                <goal>shade</goal>
    +            </goals>
    +            <configuration>
    +                <transformers>
    +                    <transformer
    +                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
    +                    <transformer
    +                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    +                        <mainClass></mainClass>
    +                    </transformer>
    +                </transformers>
    +            </configuration>
    +        </execution>
    +    </executions>
    +</plugin>
    +
    +```
    +
    +### Specifying a Hadoop Version
    +By default, storm-hdfs uses the following Hadoop dependencies:
    +
    +```xml
    +<dependency>
    +    <groupId>org.apache.hadoop</groupId>
    +    <artifactId>hadoop-client</artifactId>
    +    <version>2.2.0</version>
    +    <exclusions>
    +        <exclusion>
    +            <groupId>org.slf4j</groupId>
    +            <artifactId>slf4j-log4j12</artifactId>
    +        </exclusion>
    +    </exclusions>
    +</dependency>
    +<dependency>
    +    <groupId>org.apache.hadoop</groupId>
    +    <artifactId>hadoop-hdfs</artifactId>
    +    <version>2.2.0</version>
    +    <exclusions>
    +        <exclusion>
    +            <groupId>org.slf4j</groupId>
    +            <artifactId>slf4j-log4j12</artifactId>
    +        </exclusion>
    +    </exclusions>
    +</dependency>
    +```
    +
    +If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the storm-hdfs dependency
    +and add the dependencies for your preferred version in your pom.
    +
    +Hadoop client version incompatibilites can manifest as errors like:
    +
    +```
    +com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)
    +```
    +
    +## Customization
    +
    +### Record Formats
    +Record format can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.RecordFormat`
    +interface:
    +
    +```java
    +public interface RecordFormat extends Serializable {
    +    byte[] format(Tuple tuple);
    +}
    +```
    +
    +The provided `org.apache.storm.hdfs.format.DelimitedRecordFormat` is capable of producing formats such as CSV and
    +tab-delimited files.
    +
    +
    +### File Naming
    +File naming can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.FileNameFormat`
    +interface:
    +
    +```java
    +public interface FileNameFormat extends Serializable {
    +    void prepare(Map conf, TopologyContext topologyContext);
    +    String getName(long rotation, long timeStamp);
    +    String getPath();
    +}
    +```
    +
    +The provided `org.apache.storm.hdfs.format.DefaultFileNameFormat`  will create file names with the following format:
    +
    +     {prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
    +
    +For example:
    +
    +     MyBolt-5-7-1390579837830.txt
    +
    +By default, prefix is empty and extenstion is ".txt".
    +
    +
    +
    +### Sync Policies
    +Sync policies allow you to control when buffered data is flushed to the underlying filesystem (thus making it available
    +to clients reading the data) by implementing the `org.apache.storm.hdfs.sync.SyncPolicy` interface:
    +
    +```java
    +public interface SyncPolicy extends Serializable {
    +    boolean mark(Tuple tuple, long offset);
    +    void reset();
    +}
    +```
    +The `HdfsBolt` will call the `mark()` method for every tuple it processes. Returning `true` will trigger the `HdfsBolt`
    +to perform a sync/flush, after which it will call the `reset()` method.
    +
    +The `org.apache.storm.hdfs.sync.CountSyncPolicy` class simply triggers a sync after the specified number of tuples have
    +been processed.
    +
    +### File Rotation Policies
    +Similar to sync policies, file rotation policies allow you to control when data files are rotated by providing a
    +`org.apache.storm.hdfs.rotation.FileRotation` interface:
    +
    +```java
    +public interface FileRotationPolicy extends Serializable {
    +    boolean mark(Tuple tuple, long offset);
    +    void reset();
    +}
    +``` 
    +
    +The `org.apache.storm.hdfs.rotation.FileSizeRotationPolicy` implementation allows you to trigger file rotation when
    +data files reach a specific file size:
    +
    +```java
    +FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
    +```
    +
    +### File Rotation Actions
    +Both the HDFS bolt and Trident State implementation allow you to register any number of `RotationAction`s.
    +What `RotationAction`s do is provide a hook to allow you to perform some action right after a file is rotated. For
    +example, moving a file to a different location or renaming it.
    +
    +
    +```java
    +public interface RotationAction extends Serializable {
    +    void execute(FileSystem fileSystem, Path filePath) throws IOException;
    +}
    +```
    +
    +Storm-HDFS includes a simple action that will move a file after rotation:
    +
    +```java
    +public class MoveFileAction implements RotationAction {
    +    private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class);
    +
    +    private String destination;
    +
    +    public MoveFileAction withDestination(String destDir){
    +        destination = destDir;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(FileSystem fileSystem, Path filePath) throws IOException {
    +        Path destPath = new Path(destination, filePath.getName());
    +        LOG.info("Moving file {} to {}", filePath, destPath);
    +        boolean success = fileSystem.rename(filePath, destPath);
    +        return;
    +    }
    +}
    +```
    +
    +If you are using Trident and sequence files you can do something like this:
    +
    +```java
    +        HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
    +                .withFileNameFormat(fileNameFormat)
    +                .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
    +                .withRotationPolicy(rotationPolicy)
    +                .withFsUrl("hdfs://localhost:54310")
    +                .addRotationAction(new MoveFileAction().withDestination("/dest2/"));
    +```
    +
    +
    +## Support for HDFS Sequence Files
    +
    +The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to write storm data to HDFS sequence files:
    +
    +```java
    +        // sync the filesystem after every 1k tuples
    +        SyncPolicy syncPolicy = new CountSyncPolicy(1000);
    +
    +        // rotate files when they reach 5MB
    +        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
    +
    +        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
    +                .withExtension(".seq")
    +                .withPath("/data/");
    +
    +        // create sequence format instance.
    +        DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence");
    +
    +        SequenceFileBolt bolt = new SequenceFileBolt()
    +                .withFsUrl("hdfs://localhost:54310")
    +                .withFileNameFormat(fileNameFormat)
    +                .withSequenceFormat(format)
    +                .withRotationPolicy(rotationPolicy)
    +                .withSyncPolicy(syncPolicy)
    +                .withCompressionType(SequenceFile.CompressionType.RECORD)
    +                .withCompressionCodec("deflate");
    --- End diff --
    
    The deflate codec just reminded me too, now that JNI support is available in storm, it might be nice to include instructions about how to package libhadoop.so so that storm/Hadoop can pick it up.  It can be a huge speed improvement over the pure java compression/CRC implementations.
    
    Might even be worth it to do some of that on behalf of the user, but that might require some coordination with Hadoop itself so that the native libraries are packaged in a way that would allow us to pull them in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13384972
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java ---
    @@ -0,0 +1,48 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.common.security;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.SecurityUtil;
    +import org.apache.hadoop.security.UserGroupInformation;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +
    +/**
    + * This class provides util methods for storm-hdfs connector communicating
    + * with secured HDFS.
    + */
    +public class HdfsSecurityUtil {
    +    public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";
    +    public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
    --- End diff --
    
    I would prefer to see Hadoop in the name of these configs, at first glance they feel like they are for all of storm, but they are not and we took a very different route for configuring in the security branch, so adopting them is a bit difficult. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-storm pull request: STORM-211: Add module for HDFS integ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13235725
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java ---
    @@ -0,0 +1,146 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.bolt;
    +
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.SequenceFile;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    +import org.apache.storm.hdfs.bolt.format.SequenceFormat;
    +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    +import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.Map;
    +
    +public class SequenceFileBolt extends AbstractHdfsBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(SequenceFileBolt.class);
    +
    +    private SequenceFormat format;
    +    private SequenceFile.CompressionType compressionType = SequenceFile.CompressionType.RECORD;
    +    private SequenceFile.Writer writer;
    +
    +    private String compressionCodec = "default";
    +    private transient CompressionCodecFactory codecFactory;
    +
    +    public SequenceFileBolt() {
    +    }
    +
    +    public SequenceFileBolt withCompressionCodec(String codec){
    +        this.compressionCodec = codec;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withFsUrl(String fsUrl) {
    +        this.fsUrl = fsUrl;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withConfigKey(String configKey){
    +        this.configKey = configKey;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withFileNameFormat(FileNameFormat fileNameFormat) {
    +        this.fileNameFormat = fileNameFormat;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withSequenceFormat(SequenceFormat format) {
    +        this.format = format;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withSyncPolicy(SyncPolicy syncPolicy) {
    +        this.syncPolicy = syncPolicy;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withRotationPolicy(FileRotationPolicy rotationPolicy) {
    +        this.rotationPolicy = rotationPolicy;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt withCompressionType(SequenceFile.CompressionType compressionType){
    +        this.compressionType = compressionType;
    +        return this;
    +    }
    +
    +    public SequenceFileBolt addRotationAction(RotationAction action){
    +        this.rotationActions.add(action);
    +        return this;
    +    }
    +
    +    @Override
    +    public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
    +        LOG.info("Preparing Sequence File Bolt...");
    +        if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified.");
    +
    +        this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
    +        this.codecFactory = new CompressionCodecFactory(hdfsConfig);
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            this.writer.append(this.format.key(tuple), this.format.value(tuple));
    +            long offset = this.writer.getLength();
    +            this.collector.ack(tuple);
    +
    +            if (this.syncPolicy.mark(tuple, offset)) {
    +                this.writer.hsync();
    --- End diff --
    
    hsync is Hadoop 2.x specific.  If we are going to restrict ourselves to that, I am OK, but we may want to document it in the section about incompatible exceptions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---