You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by ajantha-bhat <gi...@git.apache.org> on 2018/08/21 10:10:06 UTC

[GitHub] carbondata pull request #2646: [WIP] Support SDK writer as thread safe api

GitHub user ajantha-bhat opened a pull request:

    https://github.com/apache/carbondata/pull/2646

    [WIP] Support SDK writer as thread safe api

    Be sure to do all of the following checklist to help us incorporate 
    your contribution quickly and easily:
    
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
    
     - [ ] Testing done
            Please provide details on 
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ajantha-bhat/carbondata issue_fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2646.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2646
    
----
commit d239b6e379e6dd2f4627df33b93aa2217fb94558
Author: ajantha-bhat <aj...@...>
Date:   2018-08-21T05:22:35Z

    make sdk writer thread safe

commit f54bd7dc349b71a93c7343ab8aac12aacd90e737
Author: ajantha-bhat <aj...@...>
Date:   2018-08-21T10:05:31Z

    update sdk guide

----


---

[GitHub] carbondata issue #2646: [WIP] Support SDK writer as thread safe api

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7972/



---

[GitHub] carbondata issue #2646: [WIP] Support SDK writer as thread safe api

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6694/



---

[GitHub] carbondata issue #2646: [WIP] Support SDK writer as thread safe api

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6696/



---

[GitHub] carbondata issue #2646: [WIP] Support SDK writer as thread safe api

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6322/



---

[GitHub] carbondata pull request #2646: [WIP] Support SDK writer as thread safe api

Posted by ajantha-bhat <gi...@git.apache.org>.
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2646#discussion_r211900027
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---
    @@ -45,17 +45,21 @@
     
       private RowBatch readBatch;
     
    +  private static Object lockObject = new Object();
    +
       private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10);
     
       public void write(Object[] row) throws InterruptedException {
         if (close) {
           // already might be closed forcefully
           return;
         }
    -    if (!loadBatch.addRow(row)) {
    -      loadBatch.readyRead();
    -      queue.put(loadBatch);
    -      loadBatch = new RowBatch(batchSize);
    +    synchronized (lockObject) {
    --- End diff --
    
    yes, can use this as this is object level synchronization


---

[GitHub] carbondata pull request #2646: [WIP] Support SDK writer as thread safe api

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2646#discussion_r211841415
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---
    @@ -45,17 +45,21 @@
     
       private RowBatch readBatch;
     
    +  private static Object lockObject = new Object();
    +
       private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10);
     
       public void write(Object[] row) throws InterruptedException {
         if (close) {
           // already might be closed forcefully
           return;
         }
    -    if (!loadBatch.addRow(row)) {
    -      loadBatch.readyRead();
    -      queue.put(loadBatch);
    -      loadBatch = new RowBatch(batchSize);
    +    synchronized (lockObject) {
    --- End diff --
    
    why not use `this`


---

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6335/



---

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

Posted by ajantha-bhat <gi...@git.apache.org>.
Github user ajantha-bhat commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    closing this. As we handle this in different way in PR #2653


---

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

Posted by ajantha-bhat <gi...@git.apache.org>.
Github user ajantha-bhat commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    retest SDV please


---

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6749/



---

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6372/



---

[GitHub] carbondata issue #2646: [WIP] Support SDK writer as thread safe api

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7970/



---

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7974/



---

[GitHub] carbondata issue #2646: [WIP] Support SDK writer as thread safe api

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7962/



---

[GitHub] carbondata pull request #2646: [CARBONDATA-2874] Support SDK writer as threa...

Posted by ajantha-bhat <gi...@git.apache.org>.
Github user ajantha-bhat closed the pull request at:

    https://github.com/apache/carbondata/pull/2646


---

[GitHub] carbondata issue #2646: [WIP] Support SDK writer as thread safe api

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6686/



---

[GitHub] carbondata pull request #2646: [CARBONDATA-2874] Support SDK writer as threa...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2646#discussion_r212350373
  
    --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +
    +import org.apache.commons.io.FileUtils;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +/**
    + * multi-thread Test suite for {@link CSVCarbonWriter}
    + */
    +public class ConcurrentSdkWriterTest {
    +
    +  @Test public void testWriteFiles() throws IOException {
    +    String path = "./testWriteFiles";
    +    FileUtils.deleteDirectory(new File(path));
    +
    +    Field[] fields = new Field[2];
    +    fields[0] = new Field("name", DataTypes.STRING);
    +    fields[1] = new Field("age", DataTypes.INT);
    +
    +    int poolSize = 6;
    +    ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
    +    try {
    +      CarbonWriterBuilder builder =
    +          CarbonWriter.builder().outputPath(path).uniqueIdentifier(555).taskNo(123);
    +      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
    +      // write in multi-thread
    +      for (int i = 0; i < poolSize; i++) {
    +        executorService.submit(new writeLogic(writer));
    --- End diff --
    
    writeLogic should be WriteLogic


---

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

Posted by ajantha-bhat <gi...@git.apache.org>.
Github user ajantha-bhat commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    retest this please


---

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6698/



---

[GitHub] carbondata pull request #2646: [CARBONDATA-2874] Support SDK writer as threa...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2646#discussion_r212350482
  
    --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +
    +import org.apache.commons.io.FileUtils;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +/**
    + * multi-thread Test suite for {@link CSVCarbonWriter}
    + */
    +public class ConcurrentSdkWriterTest {
    +
    +  @Test public void testWriteFiles() throws IOException {
    +    String path = "./testWriteFiles";
    +    FileUtils.deleteDirectory(new File(path));
    +
    +    Field[] fields = new Field[2];
    +    fields[0] = new Field("name", DataTypes.STRING);
    +    fields[1] = new Field("age", DataTypes.INT);
    +
    +    int poolSize = 6;
    +    ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
    +    try {
    +      CarbonWriterBuilder builder =
    +          CarbonWriter.builder().outputPath(path).uniqueIdentifier(555).taskNo(123);
    +      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
    +      // write in multi-thread
    +      for (int i = 0; i < poolSize; i++) {
    +        executorService.submit(new writeLogic(writer));
    +      }
    +      executorService.shutdown();
    +      executorService.awaitTermination(2, TimeUnit.HOURS);
    +      writer.close();
    +    } catch (Exception e) {
    +      e.printStackTrace();
    --- End diff --
    
    should fail the test


---

[GitHub] carbondata issue #2646: [WIP] Support SDK writer as thread safe api

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6321/



---

[GitHub] carbondata pull request #2646: [WIP] Support SDK writer as thread safe api

Posted by ajantha-bhat <gi...@git.apache.org>.
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2646#discussion_r211914760
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---
    @@ -45,17 +45,21 @@
     
       private RowBatch readBatch;
     
    +  private static Object lockObject = new Object();
    +
       private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10);
     
       public void write(Object[] row) throws InterruptedException {
         if (close) {
           // already might be closed forcefully
           return;
         }
    -    if (!loadBatch.addRow(row)) {
    -      loadBatch.readyRead();
    -      queue.put(loadBatch);
    -      loadBatch = new RowBatch(batchSize);
    +    synchronized (lockObject) {
    --- End diff --
    
    if used 'this', findbugs thinks I need to synchronize hasNext() and close() method also. It says inconsistent synchronization.  Hence to avoid that this variable.



---

[GitHub] carbondata pull request #2646: [CARBONDATA-2874] Support SDK writer as threa...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2646#discussion_r212353990
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---
    @@ -45,17 +45,21 @@
     
       private RowBatch readBatch;
     
    +  private static Object lockObject = new Object();
    +
       private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10);
     
       public void write(Object[] row) throws InterruptedException {
         if (close) {
           // already might be closed forcefully
           return;
         }
    -    if (!loadBatch.addRow(row)) {
    -      loadBatch.readyRead();
    -      queue.put(loadBatch);
    -      loadBatch = new RowBatch(batchSize);
    +    synchronized (lockObject) {
    --- End diff --
    
    I feel that the loadBatch should be the parameter of this function, so the caller can accumulate the rows in a batch in its own thread, then call this write function to add to the shared queue.


---

[GitHub] carbondata issue #2646: [CARBONDATA-2874] Support SDK writer as thread safe ...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8025/



---

[GitHub] carbondata issue #2646: [WIP] Support SDK writer as thread safe api

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7963/



---

[GitHub] carbondata issue #2646: [WIP] Support SDK writer as thread safe api

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6313/



---

[GitHub] carbondata pull request #2646: [CARBONDATA-2874] Support SDK writer as threa...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2646#discussion_r212350574
  
    --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +
    +import org.apache.commons.io.FileUtils;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +/**
    + * multi-thread Test suite for {@link CSVCarbonWriter}
    + */
    +public class ConcurrentSdkWriterTest {
    +
    +  @Test public void testWriteFiles() throws IOException {
    +    String path = "./testWriteFiles";
    +    FileUtils.deleteDirectory(new File(path));
    +
    +    Field[] fields = new Field[2];
    +    fields[0] = new Field("name", DataTypes.STRING);
    +    fields[1] = new Field("age", DataTypes.INT);
    +
    +    int poolSize = 6;
    +    ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
    +    try {
    +      CarbonWriterBuilder builder =
    +          CarbonWriter.builder().outputPath(path).uniqueIdentifier(555).taskNo(123);
    +      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
    +      // write in multi-thread
    +      for (int i = 0; i < poolSize; i++) {
    +        executorService.submit(new writeLogic(writer));
    +      }
    +      executorService.shutdown();
    +      executorService.awaitTermination(2, TimeUnit.HOURS);
    +      writer.close();
    +    } catch (Exception e) {
    +      e.printStackTrace();
    +    }
    +
    +    // read the files and verify the count
    +    CarbonReader reader;
    +    try {
    +      reader =
    +          CarbonReader.builder(path, "_temp").projection(new String[] { "name", "age" }).build();
    +      int i = 0;
    +      while (reader.hasNext()) {
    +        Object[] row = (Object[]) reader.readNextRow();
    +        i++;
    +      }
    +      // count should be 60 records
    +      Assert.assertEquals(i, 60);
    +      reader.close();
    +    } catch (InterruptedException e) {
    +      e.printStackTrace();
    +    }
    +
    +    FileUtils.deleteDirectory(new File(path));
    +  }
    +
    +  class writeLogic implements Runnable {
    +    CarbonWriter writer;
    +
    +    writeLogic(CarbonWriter writer) {
    +      this.writer = writer;
    +    }
    +
    +    @Override public void run() {
    +
    +      try {
    +        for (int i = 0; i < 10; i++) {
    +          writer.write(new String[] { "robot" + (i % 10), String.valueOf(i),
    +              String.valueOf((double) i / 2) });
    +        }
    +      } catch (IOException e) {
    +        e.printStackTrace();
    --- End diff --
    
    should fail the test


---

[GitHub] carbondata issue #2646: [WIP] Support SDK writer as thread safe api

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6320/



---

[GitHub] carbondata issue #2646: [WIP] Support SDK writer as thread safe api

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6687/



---

[GitHub] carbondata issue #2646: [WIP] Support SDK writer as thread safe api

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2646
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6314/



---