You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by QiangCai <gi...@git.apache.org> on 2018/07/24 03:21:05 UTC

[GitHub] carbondata pull request #2544: [WIP][CarbonStore] Support ingesting data fro...

GitHub user QiangCai opened a pull request:

    https://github.com/apache/carbondata/pull/2544

    [WIP][CarbonStore] Support ingesting data from DIS

    Be sure to do all of the following checklist to help us incorporate 
    your contribution quickly and easily:
    
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
    
     - [ ] Testing done
            Please provide details on 
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/QiangCai/carbondata support_dis

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2544.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2544
    
----
commit 0adf03ba5f666a79c73a73ef1b9bb1e34dfee814
Author: QiangCai <qi...@...>
Date:   2018-07-19T06:50:38Z

    fix task locality issue
    
    dependency

commit 0f0f6be778a10cd8d8443bb13455bb4883d7823b
Author: QiangCai <qi...@...>
Date:   2018-07-24T03:18:59Z

    support ingesting data from DIS

----


---

[GitHub] carbondata issue #2544: [CARBONDATA-2776][CarbonStore] Support ingesting dat...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2544
  
    LGTM


---

[GitHub] carbondata issue #2544: [CARBONDATA-2776][CarbonStore] Support ingesting dat...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2544
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6368/



---

[GitHub] carbondata issue #2544: [CARBONDATA-2776][CarbonStore] Support ingesting dat...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2544
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7595/



---

[GitHub] carbondata pull request #2544: [CARBONDATA-2776][CarbonStore] Support ingest...

Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2544#discussion_r204966885
  
    --- Diff: pom.xml ---
    @@ -110,7 +110,7 @@
       <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <snappy.version>1.1.2.6</snappy.version>
    -    <hadoop.version>2.7.2</hadoop.version>
    +    <hadoop.version>2.8.3</hadoop.version>
    --- End diff --
    
    can we directly upgrade the hadoop version to 2.8.3 without any other changes?


---

[GitHub] carbondata issue #2544: [CARBONDATA-2776][CarbonStore] Support ingesting dat...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2544
  
    please rebase


---

[GitHub] carbondata issue #2544: [CARBONDATA-2776][CarbonStore] Support ingesting dat...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2544
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7539/



---

[GitHub] carbondata pull request #2544: [CARBONDATA-2776][CarbonStore] Support ingest...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai closed the pull request at:

    https://github.com/apache/carbondata/pull/2544


---

[GitHub] carbondata pull request #2544: [CARBONDATA-2776][CarbonStore] Support ingest...

Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2544#discussion_r204966779
  
    --- Diff: store/sql/pom.xml ---
    @@ -35,6 +36,33 @@
             </exclusion>
           </exclusions>
         </dependency>
    +    <dependency>
    +      <groupId>org.apache.hadoop</groupId>
    +      <artifactId>hadoop-aws</artifactId>
    +      <version>${hadoop.version}</version>
    +      <exclusions>
    +        <exclusion>
    +          <groupId>com.fasterxml.jackson.core</groupId>
    +          <artifactId>*</artifactId>
    +        </exclusion>
    +      </exclusions>
    +    </dependency>
    +    <dependency>
    +      <groupId>com.amazonaws</groupId>
    +      <artifactId>aws-java-sdk-s3</artifactId>
    +      <version>	1.10.6</version>
    --- End diff --
    
    trim the useless spaces


---

[GitHub] carbondata issue #2544: [WIP][CarbonStore] Support ingesting data from DIS

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2544
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7424/



---

[GitHub] carbondata issue #2544: [CARBONDATA-2776][CarbonStore] Support ingesting dat...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2544
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6057/



---

[GitHub] carbondata pull request #2544: [CARBONDATA-2776][CarbonStore] Support ingest...

Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2544#discussion_r204967361
  
    --- Diff: store/sql/src/main/java/org/apache/carbondata/dis/DisProducer.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.dis;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.charset.Charset;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Date;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ThreadLocalRandom;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +
    +import com.huaweicloud.dis.DIS;
    +import com.huaweicloud.dis.DISClientBuilder;
    +import com.huaweicloud.dis.exception.DISClientException;
    +import com.huaweicloud.dis.http.exception.ResourceAccessException;
    +import com.huaweicloud.dis.iface.data.request.PutRecordsRequest;
    +import com.huaweicloud.dis.iface.data.request.PutRecordsRequestEntry;
    +import com.huaweicloud.dis.iface.data.response.PutRecordsResult;
    +import com.huaweicloud.dis.iface.data.response.PutRecordsResultEntry;
    +
    +public class DisProducer {
    +
    +  private static AtomicLong eventId = new AtomicLong(0);
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DisProducer.class.getName());
    +
    +  public static void main(String[] args) {
    +    if (args.length < 6) {
    +      System.err.println(
    +          "Usage: DisProducer <stream name> <endpoint> <region> <ak> <sk> <project id> ");
    +      return;
    +    }
    +
    +    DIS dic = DISClientBuilder.standard().withEndpoint(args[1]).withAk(args[3]).withSk(args[4])
    +        .withProjectId(args[5]).withRegion(args[2]).build();
    +
    +    Sensor sensor = new Sensor(dic, args[0]);
    +    Timer timer = new Timer();
    +    timer.schedule(sensor, 0, 5000);
    +
    +  }
    +
    +  static class Sensor extends TimerTask {
    +    private DIS dic;
    +
    +    private String streamName;
    +
    +    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    +
    +    private Random random = new Random();
    +
    +    private int i = 0;
    +    private int flag = 1;
    +
    +    Sensor(DIS dic, String streamName) {
    +      this.dic = dic;
    +      this.streamName = streamName;
    +    }
    +
    +    @Override public void run() {
    +      uploadData();
    +      //recordSensor();
    +    }
    +
    +    private void uploadData() {
    +      PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
    +      putRecordsRequest.setStreamName(streamName);
    +      List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
    +      PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    +      putRecordsRequestEntry.setData(ByteBuffer.wrap(recordSensor()));
    +      putRecordsRequestEntry
    +          .setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));
    +      putRecordsRequestEntryList.add(putRecordsRequestEntry);
    +      putRecordsRequest.setRecords(putRecordsRequestEntryList);
    +
    +      LOGGER.info("========== BEGIN PUT ============");
    +
    +      PutRecordsResult putRecordsResult = null;
    +      try {
    +        putRecordsResult = dic.putRecords(putRecordsRequest);
    +      } catch (DISClientException e) {
    +        LOGGER.error(e,
    +            "Failed to get a normal response, please check params and retry." + e.getMessage());
    +      } catch (ResourceAccessException e) {
    +        LOGGER.error(e, "Failed to access endpoint. " + e.getMessage());
    +      } catch (Exception e) {
    +        LOGGER.error(e, e.getMessage());
    +      }
    +
    +      if (putRecordsResult != null) {
    +        LOGGER.info("Put " + putRecordsResult.getRecords().size() + " records[" + (
    +            putRecordsResult.getRecords().size() - putRecordsResult.getFailedRecordCount().get())
    +            + " successful / " + putRecordsResult.getFailedRecordCount() + " failed].");
    +
    +        for (int j = 0; j < putRecordsResult.getRecords().size(); j++) {
    +          PutRecordsResultEntry putRecordsRequestEntry1 = putRecordsResult.getRecords().get(j);
    +          if (putRecordsRequestEntry1.getErrorCode() != null) {
    +            LOGGER.error("[" + new String(
    +                putRecordsRequestEntryList.get(j).getData().array(), Charset.defaultCharset())
    +                + "] put failed, errorCode [" + putRecordsRequestEntry1.getErrorCode()
    +                + "], errorMessage [" + putRecordsRequestEntry1.getErrorMessage() + "]");
    +          } else {
    +            LOGGER.info("[" + new String(
    --- End diff --
    
    why not use stringbuilder or stringformat


---

[GitHub] carbondata issue #2544: [CARBONDATA-2776][CarbonStore] Support ingesting dat...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2544
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6058/



---

[GitHub] carbondata issue #2544: [CARBONDATA-2776][CarbonStore] Support ingesting dat...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2544
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7594/



---

[GitHub] carbondata pull request #2544: [CARBONDATA-2776][CarbonStore] Support ingest...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2544#discussion_r205995493
  
    --- Diff: store/sql/src/main/java/org/apache/carbondata/dis/DisProducer.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.dis;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.charset.Charset;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Date;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ThreadLocalRandom;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +
    +import com.huaweicloud.dis.DIS;
    +import com.huaweicloud.dis.DISClientBuilder;
    +import com.huaweicloud.dis.exception.DISClientException;
    +import com.huaweicloud.dis.http.exception.ResourceAccessException;
    +import com.huaweicloud.dis.iface.data.request.PutRecordsRequest;
    +import com.huaweicloud.dis.iface.data.request.PutRecordsRequestEntry;
    +import com.huaweicloud.dis.iface.data.response.PutRecordsResult;
    +import com.huaweicloud.dis.iface.data.response.PutRecordsResultEntry;
    +
    +public class DisProducer {
    +
    +  private static AtomicLong eventId = new AtomicLong(0);
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(DisProducer.class.getName());
    +
    +  public static void main(String[] args) {
    +    if (args.length < 6) {
    +      System.err.println(
    +          "Usage: DisProducer <stream name> <endpoint> <region> <ak> <sk> <project id> ");
    +      return;
    +    }
    +
    +    DIS dic = DISClientBuilder.standard().withEndpoint(args[1]).withAk(args[3]).withSk(args[4])
    +        .withProjectId(args[5]).withRegion(args[2]).build();
    +
    +    Sensor sensor = new Sensor(dic, args[0]);
    +    Timer timer = new Timer();
    +    timer.schedule(sensor, 0, 5000);
    +
    +  }
    +
    +  static class Sensor extends TimerTask {
    +    private DIS dic;
    +
    +    private String streamName;
    +
    +    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    +
    +    private Random random = new Random();
    +
    +    private int i = 0;
    +    private int flag = 1;
    +
    +    Sensor(DIS dic, String streamName) {
    +      this.dic = dic;
    +      this.streamName = streamName;
    +    }
    +
    +    @Override public void run() {
    +      uploadData();
    +      //recordSensor();
    +    }
    +
    +    private void uploadData() {
    +      PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
    +      putRecordsRequest.setStreamName(streamName);
    +      List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
    +      PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    +      putRecordsRequestEntry.setData(ByteBuffer.wrap(recordSensor()));
    +      putRecordsRequestEntry
    +          .setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));
    +      putRecordsRequestEntryList.add(putRecordsRequestEntry);
    +      putRecordsRequest.setRecords(putRecordsRequestEntryList);
    +
    +      LOGGER.info("========== BEGIN PUT ============");
    +
    +      PutRecordsResult putRecordsResult = null;
    +      try {
    +        putRecordsResult = dic.putRecords(putRecordsRequest);
    +      } catch (DISClientException e) {
    +        LOGGER.error(e,
    +            "Failed to get a normal response, please check params and retry." + e.getMessage());
    +      } catch (ResourceAccessException e) {
    +        LOGGER.error(e, "Failed to access endpoint. " + e.getMessage());
    +      } catch (Exception e) {
    +        LOGGER.error(e, e.getMessage());
    +      }
    +
    +      if (putRecordsResult != null) {
    +        LOGGER.info("Put " + putRecordsResult.getRecords().size() + " records[" + (
    +            putRecordsResult.getRecords().size() - putRecordsResult.getFailedRecordCount().get())
    +            + " successful / " + putRecordsResult.getFailedRecordCount() + " failed].");
    +
    +        for (int j = 0; j < putRecordsResult.getRecords().size(); j++) {
    +          PutRecordsResultEntry putRecordsRequestEntry1 = putRecordsResult.getRecords().get(j);
    +          if (putRecordsRequestEntry1.getErrorCode() != null) {
    +            LOGGER.error("[" + new String(
    +                putRecordsRequestEntryList.get(j).getData().array(), Charset.defaultCharset())
    +                + "] put failed, errorCode [" + putRecordsRequestEntry1.getErrorCode()
    +                + "], errorMessage [" + putRecordsRequestEntry1.getErrorMessage() + "]");
    +          } else {
    +            LOGGER.info("[" + new String(
    --- End diff --
    
    ok


---

[GitHub] carbondata issue #2544: [CARBONDATA-2776][CarbonStore] Support ingesting dat...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2544
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6293/



---

[GitHub] carbondata issue #2544: [CARBONDATA-2776][CarbonStore] Support ingesting dat...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2544
  
    merging into carbonstore branch


---