You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by vesense <gi...@git.apache.org> on 2016/11/15 08:22:03 UTC

[GitHub] storm pull request #1778: [STORM-2082][SQL] add sql external module storm-sq...

GitHub user vesense opened a pull request:

    https://github.com/apache/storm/pull/1778

    [STORM-2082][SQL] add sql external module storm-sql-hdfs

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vesense/storm STORM-2082

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1778.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1778
    
----
commit c9c565464be732de4a4035bd6ead85ccf8204949
Author: Xin Wang <be...@163.com>
Date:   2016-11-13T10:25:27Z

    [STORM-2082][SQL] add sql external module storm-sql-hdfs

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1778: [STORM-2082][SQL] add sql external module storm-sq...

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1778#discussion_r87985832
  
    --- Diff: external/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.sql.hdfs;
    +
    +import org.apache.storm.hdfs.trident.HdfsState;
    +import org.apache.storm.hdfs.trident.HdfsStateFactory;
    +import org.apache.storm.hdfs.trident.HdfsUpdater;
    +import org.apache.storm.hdfs.trident.format.FileNameFormat;
    +import org.apache.storm.hdfs.trident.format.RecordFormat;
    +import org.apache.storm.hdfs.trident.format.SimpleFileNameFormat;
    +import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
    +import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
    +import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
    +import org.apache.storm.sql.runtime.DataSource;
    +import org.apache.storm.sql.runtime.DataSourcesProvider;
    +import org.apache.storm.sql.runtime.FieldInfo;
    +import org.apache.storm.sql.runtime.IOutputSerializer;
    +import org.apache.storm.sql.runtime.ISqlTridentDataSource;
    +import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
    +import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
    +import org.apache.storm.sql.runtime.utils.SerdeUtils;
    +import org.apache.storm.trident.spout.ITridentDataSource;
    +import org.apache.storm.trident.state.StateFactory;
    +import org.apache.storm.trident.state.StateUpdater;
    +import org.apache.storm.trident.tuple.TridentTuple;
    +
    +import java.net.URI;
    +import java.util.List;
    +import java.util.Properties;
    +
    +/**
    + * Create a HDFS sink based on the URI and properties. The URI has the format of hdfs://host:port/path-to-file
    + * The properties are in JSON format which specifies the name / path of the hdfs file and etc.
    + */
    +public class HdfsDataSourcesProvider implements DataSourcesProvider {
    +
    +  private static class HdfsTridentDataSource implements ISqlTridentDataSource {
    +    private final String url;
    +    private final Properties props;
    +    private final IOutputSerializer serializer;
    +
    +    private HdfsTridentDataSource(String url, Properties props, IOutputSerializer serializer) {
    +      this.url = url;
    +      this.props = props;
    +      this.serializer = serializer;
    +    }
    +
    +    @Override
    +    public ITridentDataSource getProducer() {
    +      throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
    +    }
    +
    +    @Override
    +    public SqlTridentConsumer getConsumer() {
    +      FileNameFormat fileNameFormat = new SimpleFileNameFormat()
    +          .withPath(props.getProperty("hdfs.file.path", "/storm"))
    +          .withName(props.getProperty("hdfs.file.name", "$TIME.$NUM.txt"));
    +
    +      RecordFormat recordFormat = new TridentRecordFormat(serializer);
    +
    +      FileRotationPolicy rotationPolicy;
    +      String size = props.getProperty("hdfs.rotation.size.kb");
    +      if (size != null) {
    +        rotationPolicy = new FileSizeRotationPolicy(Float.parseFloat(size), FileSizeRotationPolicy.Units.KB);
    +      } else {
    +        float interval = Float.parseFloat(props.getProperty("hdfs.rotation.time.seconds", "600")); // default 600 seconds
    --- End diff --
    
    No default value used from storm-hdfs. I will add a preconditions check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1778: [STORM-2082][SQL] add sql external module storm-sql-hdfs

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on the issue:

    https://github.com/apache/storm/pull/1778
  
    @HeartSaVioR Now I'm a little busy, I will add it later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1778: [STORM-2082][SQL] add sql external module storm-sq...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1778#discussion_r90187413
  
    --- Diff: docs/storm-sql-reference.md ---
    @@ -1312,4 +1313,23 @@ You can use below as working reference for `--artifacts` option, and change depe
     
     `org.apache.storm:storm-sql-mongodb:2.0.0-SNAPSHOT,org.apache.storm:storm-mongodb:2.0.0-SNAPSHOT`
     
    -Storing record with preserving fields are not supported for now.
    \ No newline at end of file
    +Storing record with preserving fields are not supported for now.
    +
    +#### HDFS
    +
    +HDFS data source requires below properties to be set:
    +
    +* `hdfs.file.path`: HDFS file path
    +* `hdfs.file.name`: HDFS file name - please refer to [SimpleFileNameFormat]({{page.git-blob-base}}/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java)
    +* `hdfs.rotation.size.kb`: HDFS FileSizeRotationPolicy in KB
    +* `hdfs.rotation.time.seconds`: HDFS TimedRotationPolicy in seconds
    +
    +Please note that `hdfs.rotation.size.kb` and `hdfs.rotation.time.seconds` only one can be used for hdfs rotation.
    +
    +And note that `storm-sql-hdfs` requires users to provide `storm-hdfs`.
    +You can use below as working reference for `--artifacts` option, and change dependencies version if really needed:
    +
    +`org.apache.storm:storm-sql-hdfs:2.0.0-SNAPSHOT,org.apache.storm:storm-hdfs:2.0.0-SNAPSHOT`
    +
    +Also, hdfs configuration files should be provided.
    +You can put the `core-site.xml` and `hdfs-site.xm` into the `conf` directory.
    --- End diff --
    
    typo: `hdfs-site.xm` -> `hdfs-site.xml`.
    and do you mean putting files to `conf` dir in Storm installation directory? Could you explain how they can be referred from workers?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1778: [STORM-2082][SQL] add sql external module storm-sq...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1778


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1778: [STORM-2082][SQL] add sql external module storm-sql-hdfs

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1778
  
    @vesense 
    Please provide `working SQL example statements` to test, and how to inject Hadoop resource files.
    And please write up content for STORM-2202 (#1777).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1778: [STORM-2082][SQL] add sql external module storm-sql-hdfs

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1778
  
    No problem, please take your time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1778: [STORM-2082][SQL] add sql external module storm-sq...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1778#discussion_r87973924
  
    --- Diff: external/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.sql.hdfs;
    +
    +import org.apache.storm.hdfs.trident.HdfsState;
    +import org.apache.storm.hdfs.trident.HdfsStateFactory;
    +import org.apache.storm.hdfs.trident.HdfsUpdater;
    +import org.apache.storm.hdfs.trident.format.FileNameFormat;
    +import org.apache.storm.hdfs.trident.format.RecordFormat;
    +import org.apache.storm.hdfs.trident.format.SimpleFileNameFormat;
    +import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
    +import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
    +import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
    +import org.apache.storm.sql.runtime.DataSource;
    +import org.apache.storm.sql.runtime.DataSourcesProvider;
    +import org.apache.storm.sql.runtime.FieldInfo;
    +import org.apache.storm.sql.runtime.IOutputSerializer;
    +import org.apache.storm.sql.runtime.ISqlTridentDataSource;
    +import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
    +import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
    +import org.apache.storm.sql.runtime.utils.SerdeUtils;
    +import org.apache.storm.trident.spout.ITridentDataSource;
    +import org.apache.storm.trident.state.StateFactory;
    +import org.apache.storm.trident.state.StateUpdater;
    +import org.apache.storm.trident.tuple.TridentTuple;
    +
    +import java.net.URI;
    +import java.util.List;
    +import java.util.Properties;
    +
    +/**
    + * Create a HDFS sink based on the URI and properties. The URI has the format of hdfs://host:port/path-to-file
    + * The properties are in JSON format which specifies the name / path of the hdfs file and etc.
    + */
    +public class HdfsDataSourcesProvider implements DataSourcesProvider {
    +
    +  private static class HdfsTridentDataSource implements ISqlTridentDataSource {
    +    private final String url;
    +    private final Properties props;
    +    private final IOutputSerializer serializer;
    +
    +    private HdfsTridentDataSource(String url, Properties props, IOutputSerializer serializer) {
    +      this.url = url;
    +      this.props = props;
    +      this.serializer = serializer;
    +    }
    +
    +    @Override
    +    public ITridentDataSource getProducer() {
    +      throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
    +    }
    +
    +    @Override
    +    public SqlTridentConsumer getConsumer() {
    +      FileNameFormat fileNameFormat = new SimpleFileNameFormat()
    +          .withPath(props.getProperty("hdfs.file.path", "/storm"))
    +          .withName(props.getProperty("hdfs.file.name", "$TIME.$NUM.txt"));
    +
    +      RecordFormat recordFormat = new TridentRecordFormat(serializer);
    +
    +      FileRotationPolicy rotationPolicy;
    +      String size = props.getProperty("hdfs.rotation.size.kb");
    +      if (size != null) {
    +        rotationPolicy = new FileSizeRotationPolicy(Float.parseFloat(size), FileSizeRotationPolicy.Units.KB);
    +      } else {
    +        float interval = Float.parseFloat(props.getProperty("hdfs.rotation.time.seconds", "600")); // default 600 seconds
    --- End diff --
    
    Is default value also used from storm-hdfs? If not let's require either `hdfs.rotation.size.kb` or `hdfs.rotation.time.seconds`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1778: [STORM-2082][SQL] add sql external module storm-sql-hdfs

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1778
  
    Thanks for the update. Makes sense.
    +1



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1778: [STORM-2082][SQL] add sql external module storm-sql-hdfs

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on the issue:

    https://github.com/apache/storm/pull/1778
  
    @HeartSaVioR Addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1778: [STORM-2082][SQL] add sql external module storm-sq...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1778#discussion_r87976945
  
    --- Diff: external/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.sql.hdfs;
    +
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.Lists;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileUtil;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.apache.storm.hdfs.trident.HdfsState;
    +import org.apache.storm.hdfs.trident.HdfsStateFactory;
    +import org.apache.storm.hdfs.trident.HdfsUpdater;
    +import org.apache.storm.sql.runtime.DataSourcesRegistry;
    +import org.apache.storm.sql.runtime.FieldInfo;
    +import org.apache.storm.sql.runtime.ISqlTridentDataSource;
    +import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
    +import org.apache.storm.trident.state.StateUpdater;
    +import org.apache.storm.trident.tuple.TridentTuple;
    +import org.junit.After;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import org.mockito.ArgumentMatcher;
    +import org.mockito.internal.util.reflection.Whitebox;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import static org.apache.storm.hdfs.trident.HdfsState.HdfsFileOptions;
    +import static org.mockito.Matchers.argThat;
    +import static org.mockito.Mockito.doReturn;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +
    +public class TestHdfsDataSourcesProvider {
    +  private static final List<FieldInfo> FIELDS = ImmutableList.of(
    +      new FieldInfo("ID", int.class, true),
    +      new FieldInfo("val", String.class, false));
    +  private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
    +  private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
    +  private static final Properties TBL_PROPERTIES = new Properties();
    +
    +  private static String hdfsURI;
    +  private static MiniDFSCluster hdfsCluster;
    +
    +  static {
    +    TBL_PROPERTIES.put("hdfs.file.path", "/unittest");
    +    TBL_PROPERTIES.put("hdfs.file.name", "test1.txt");
    +    TBL_PROPERTIES.put("hdfs.rotation.time.seconds", "120");
    +  }
    +
    +  @Before
    +  public void setup() throws Exception {
    +    Configuration conf = new Configuration();
    +    conf.set("fs.trash.interval", "10");
    +    conf.setBoolean("dfs.permissions", true);
    +    File baseDir = new File("./target/hdfs/").getAbsoluteFile();
    +    FileUtil.fullyDelete(baseDir);
    +    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
    +
    +    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
    +    hdfsCluster = builder.build();
    +    hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
    +  }
    +
    +  @After
    +  public void shutDown() throws IOException {
    +    hdfsCluster.shutdown();
    +  }
    +
    +  @SuppressWarnings("unchecked")
    +  @Test
    +  public void testHdfsSink() {
    +    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
    +            URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
    +    Assert.assertNotNull(ds);
    +
    +    ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
    +
    +    Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass());
    +    Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass());
    +
    +    HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
    +    StateUpdater stateUpdater = consumer.getStateUpdater();
    +
    +    HdfsFileOptions options = mock(HdfsFileOptions.class);
    +    FSDataOutputStream out = mock(FSDataOutputStream.class);
    +    Whitebox.setInternalState(state, "options", options);
    +    Whitebox.setInternalState(options, "out", out);
    +
    +    List<TridentTuple> tupleList = mockTupleList();
    +
    +    for (TridentTuple t : tupleList) {
    +      stateUpdater.updateState(state, Collections.singletonList(t), null);
    +      try {
    +        verify(out).write(argThat(new HdfsArgMatcher(t)));
    --- End diff --
    
    @vesense 
    testHdfsSink() is failing consistency. I guess options are mocked so options.execute() doesn't work as expected. You need to set options.execute() to run real method, and provide other needed fields as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1778: [STORM-2082][SQL] add sql external module storm-sql-hdfs

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on the issue:

    https://github.com/apache/storm/pull/1778
  
    @HeartSaVioR Updated and Squashed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1778: [STORM-2082][SQL] add sql external module storm-sq...

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1778#discussion_r93867628
  
    --- Diff: docs/storm-sql-reference.md ---
    @@ -1312,4 +1313,23 @@ You can use below as working reference for `--artifacts` option, and change depe
     
     `org.apache.storm:storm-sql-mongodb:2.0.0-SNAPSHOT,org.apache.storm:storm-mongodb:2.0.0-SNAPSHOT`
     
    -Storing record with preserving fields are not supported for now.
    \ No newline at end of file
    +Storing record with preserving fields are not supported for now.
    +
    +#### HDFS
    +
    +HDFS data source requires below properties to be set:
    +
    +* `hdfs.file.path`: HDFS file path
    +* `hdfs.file.name`: HDFS file name - please refer to [SimpleFileNameFormat]({{page.git-blob-base}}/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java)
    +* `hdfs.rotation.size.kb`: HDFS FileSizeRotationPolicy in KB
    +* `hdfs.rotation.time.seconds`: HDFS TimedRotationPolicy in seconds
    +
    +Please note that `hdfs.rotation.size.kb` and `hdfs.rotation.time.seconds` only one can be used for hdfs rotation.
    +
    +And note that `storm-sql-hdfs` requires users to provide `storm-hdfs`.
    +You can use below as working reference for `--artifacts` option, and change dependencies version if really needed:
    +
    +`org.apache.storm:storm-sql-hdfs:2.0.0-SNAPSHOT,org.apache.storm:storm-hdfs:2.0.0-SNAPSHOT`
    +
    +Also, hdfs configuration files should be provided.
    +You can put the `core-site.xml` and `hdfs-site.xm` into the `conf` directory.
    --- End diff --
    
    Will fix typo.
    Yes the `conf` dir is the one in Storm installation directory. The `conf` dir is added to classpath in supervisor `BasicContainer`. see: https://github.com/apache/storm/blob/2832aeb369d431a2e2f0709175db68f079855387/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L350
    
    Maybe there is a better way to locate the hdfs configuration files. For now users have to put them in each supervisor node.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1778: [STORM-2082][SQL] add sql external module storm-sq...

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1778#discussion_r87985904
  
    --- Diff: external/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.sql.hdfs;
    +
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.Lists;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileUtil;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.apache.storm.hdfs.trident.HdfsState;
    +import org.apache.storm.hdfs.trident.HdfsStateFactory;
    +import org.apache.storm.hdfs.trident.HdfsUpdater;
    +import org.apache.storm.sql.runtime.DataSourcesRegistry;
    +import org.apache.storm.sql.runtime.FieldInfo;
    +import org.apache.storm.sql.runtime.ISqlTridentDataSource;
    +import org.apache.storm.sql.runtime.serde.json.JsonSerializer;
    +import org.apache.storm.trident.state.StateUpdater;
    +import org.apache.storm.trident.tuple.TridentTuple;
    +import org.junit.After;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import org.mockito.ArgumentMatcher;
    +import org.mockito.internal.util.reflection.Whitebox;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URI;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import static org.apache.storm.hdfs.trident.HdfsState.HdfsFileOptions;
    +import static org.mockito.Matchers.argThat;
    +import static org.mockito.Mockito.doReturn;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
    +
    +public class TestHdfsDataSourcesProvider {
    +  private static final List<FieldInfo> FIELDS = ImmutableList.of(
    +      new FieldInfo("ID", int.class, true),
    +      new FieldInfo("val", String.class, false));
    +  private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
    +  private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
    +  private static final Properties TBL_PROPERTIES = new Properties();
    +
    +  private static String hdfsURI;
    +  private static MiniDFSCluster hdfsCluster;
    +
    +  static {
    +    TBL_PROPERTIES.put("hdfs.file.path", "/unittest");
    +    TBL_PROPERTIES.put("hdfs.file.name", "test1.txt");
    +    TBL_PROPERTIES.put("hdfs.rotation.time.seconds", "120");
    +  }
    +
    +  @Before
    +  public void setup() throws Exception {
    +    Configuration conf = new Configuration();
    +    conf.set("fs.trash.interval", "10");
    +    conf.setBoolean("dfs.permissions", true);
    +    File baseDir = new File("./target/hdfs/").getAbsoluteFile();
    +    FileUtil.fullyDelete(baseDir);
    +    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
    +
    +    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
    +    hdfsCluster = builder.build();
    +    hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
    +  }
    +
    +  @After
    +  public void shutDown() throws IOException {
    +    hdfsCluster.shutdown();
    +  }
    +
    +  @SuppressWarnings("unchecked")
    +  @Test
    +  public void testHdfsSink() {
    +    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
    +            URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
    +    Assert.assertNotNull(ds);
    +
    +    ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer();
    +
    +    Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass());
    +    Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass());
    +
    +    HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
    +    StateUpdater stateUpdater = consumer.getStateUpdater();
    +
    +    HdfsFileOptions options = mock(HdfsFileOptions.class);
    +    FSDataOutputStream out = mock(FSDataOutputStream.class);
    +    Whitebox.setInternalState(state, "options", options);
    +    Whitebox.setInternalState(options, "out", out);
    +
    +    List<TridentTuple> tupleList = mockTupleList();
    +
    +    for (TridentTuple t : tupleList) {
    +      stateUpdater.updateState(state, Collections.singletonList(t), null);
    +      try {
    +        verify(out).write(argThat(new HdfsArgMatcher(t)));
    --- End diff --
    
    Will fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1778: [STORM-2082][SQL] add sql external module storm-sql-hdfs

Posted by vesense <gi...@git.apache.org>.
Github user vesense commented on the issue:

    https://github.com/apache/storm/pull/1778
  
    >provide `working SQL example statements` to test
    
    Kafka -> HDFS
    
    ```
    CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'
    CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'hdfs://localhost:9000/' TBLPROPERTIES '{"hdfs.file.path":"/unittest","hdfs.file.name":"test-$NUM.txt","hdfs.rotation.time.seconds":"120"}'
    INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50
    ```
    
    >how to inject Hadoop resource files
    
    Put required jars into the `extlib` directory and configurations(core-site.xml and hdfs-site.xml) into the `conf` directory.
    
    >write up content for STORM-2202
    
    Will update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1778: [STORM-2082][SQL] add sql external module storm-sql-hdfs

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1778
  
    Also confirmed it works. I'll merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---