You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:40 UTC

[42/47] incubator-carbondata git commit: Merge remote-tracking branch 'carbon_master/master' into apache/master

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/lcm/locks/ZooKeeperLocking.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/lcm/locks/ZooKeeperLocking.java
index d0b0453,0000000..82fac7b
mode 100644,000000..100644
--- a/processing/src/main/java/org/carbondata/lcm/locks/ZooKeeperLocking.java
+++ b/processing/src/main/java/org/carbondata/lcm/locks/ZooKeeperLocking.java
@@@ -1,168 -1,0 +1,195 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.lcm.locks;
 +
++import java.io.File;
 +import java.util.Collections;
 +import java.util.List;
 +
 +import org.carbondata.common.logging.LogService;
 +import org.carbondata.common.logging.LogServiceFactory;
 +import org.carbondata.core.carbon.CarbonTableIdentifier;
 +import org.carbondata.core.constants.CarbonCommonConstants;
++import org.carbondata.core.util.CarbonProperties;
 +
 +import org.apache.zookeeper.CreateMode;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.ZooDefs.Ids;
 +import org.apache.zookeeper.ZooKeeper;
 +
 +/**
 + * For Handling the zookeeper locking implementation
 + */
 +public class ZooKeeperLocking extends AbstractCarbonLock {
 +
 +  private static final LogService LOGGER =
 +      LogServiceFactory.getLogService(ZooKeeperLocking.class.getName());
 +
 +  /**
 +   * zk is the zookeeper client instance
 +   */
 +  private static ZooKeeper zk;
 +
 +  /**
 +   * zooKeeperLocation is the location in the zoo keeper file system where the locks will be
 +   * maintained.
 +   */
 +  private static final String zooKeeperLocation = CarbonCommonConstants.ZOOKEEPER_LOCATION;
 +
 +  /**
 +   * Unique folder for each table with DatabaseName_TableName
 +   */
 +  private final String tableIdFolder;
 +
 +  /**
 +   * lockName is the name of the lock to use. This name should be same for every process that want
 +   * to share the same lock
 +   */
 +  private String lockName;
 +
 +  /**
 +   * lockPath is the unique path created for the each instance of the carbon lock.
 +   */
 +  private String lockPath;
 +
 +  private String lockTypeFolder;
 +
++  public ZooKeeperLocking(CarbonTableIdentifier tableIdentifier, String lockFile) {
++    this(tableIdentifier.getDatabaseName() + File.separator + tableIdentifier.getTableName(),
++        lockFile);
++  }
++
 +  /**
-    * @param tableIdentifier
++   * @param lockLocation
 +   * @param lockFile
 +   */
-   public ZooKeeperLocking(CarbonTableIdentifier tableIdentifier, String lockFile) {
++  public ZooKeeperLocking(String lockLocation, String lockFile) {
 +    this.lockName = lockFile;
-     this.tableIdFolder =
-         zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName()
-             + '.' + tableIdentifier.getTableName();
++    this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation;
 +
-     zk = ZookeeperInit.getInstance().getZookeeper();
++    String zooKeeperUrl =
++        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ZOOKEEPER_URL);
++    zk = ZookeeperInit.getInstance(zooKeeperUrl).getZookeeper();
 +
-     this.lockTypeFolder =
-         zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName()
-             + '.' + tableIdentifier.getTableName() + CarbonCommonConstants.FILE_SEPARATOR
-             + lockFile;
++    this.lockTypeFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation
++        + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
 +    try {
 +      createBaseNode();
 +      // if exists returns null then path doesnt exist. so creating.
 +      if (null == zk.exists(this.tableIdFolder, true)) {
-         zk.create(this.tableIdFolder, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
++        createRecursivly(this.tableIdFolder);
 +      }
 +      // if exists returns null then path doesnt exist. so creating.
 +      if (null == zk.exists(this.lockTypeFolder, true)) {
 +        zk.create(this.lockTypeFolder, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 +      }
 +    } catch (KeeperException | InterruptedException e) {
 +      LOGGER.error(e, e.getMessage());
 +    }
 +    initRetry();
 +  }
 +
 +  /**
 +   * Creating a znode in which all the znodes (lock files )are maintained.
 +   */
 +  private void createBaseNode() throws KeeperException, InterruptedException {
 +    if (null == zk.exists(zooKeeperLocation, true)) {
 +      // creating a znode in which all the znodes (lock files )are maintained.
 +      zk.create(zooKeeperLocation, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 +    }
 +  }
 +
 +  /**
++   * Create zookeepr node if not exist
++   * @param path
++   * @throws KeeperException
++   * @throws InterruptedException
++   */
++  private void createRecursivly(String path) throws KeeperException, InterruptedException {
++    try {
++      if (zk.exists(path, true) == null && path.length() > 0) {
++        String temp = path.substring(0, path.lastIndexOf(File.separator));
++        createRecursivly(temp);
++        zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
++      } else {
++        return;
++      }
++    } catch (KeeperException e) {
++      throw e;
++    } catch (InterruptedException e) {
++      throw e;
++    }
++
++  }
++  /**
 +   * Handling of the locking mechanism using zoo keeper.
 +   */
 +  @Override public boolean lock() {
 +    try {
 +      // create the lock file with lockName.
 +      lockPath =
 +          zk.create(this.lockTypeFolder + CarbonCommonConstants.FILE_SEPARATOR + lockName, null,
 +              Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
 +
 +      // get the children present in zooKeeperLocation.
 +      List<String> nodes = zk.getChildren(this.lockTypeFolder, null);
 +
 +      // sort the childrens
 +      Collections.sort(nodes);
 +
 +      // here the logic is , for each lock request zookeeper will create a file ending with
 +      // incremental digits.
 +      // so first request will be 00001 next is 00002 and so on.
 +      // if the current request is 00002 and already one previous request(00001) is present then get
 +      // children will give both nodes.
 +      // after the sort we are checking if the lock path is first or not .if it is first then lock
 +      // has been acquired.
 +
 +      if (lockPath.endsWith(nodes.get(0))) {
 +        return true;
 +      } else {
 +        // if locking failed then deleting the created lock as next time again new lock file will be
 +        // created.
 +        zk.delete(lockPath, -1);
 +        return false;
 +      }
 +    } catch (KeeperException | InterruptedException e) {
 +      LOGGER.error(e, e.getMessage());
 +      return false;
 +    }
 +  }
 +
 +  /**
 +   * @return status where lock file is unlocked or not.
 +   */
 +  @Override public boolean unlock() {
 +    try {
 +      // exists will return null if the path doesn't exists.
 +      if (null != zk.exists(lockPath, true)) {
 +        zk.delete(lockPath, -1);
 +        lockPath = null;
 +      }
 +    } catch (KeeperException | InterruptedException e) {
 +      LOGGER.error(e, e.getMessage());
 +      return false;
 +    }
 +    return true;
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/csvload/GraphExecutionUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
index 6f7a2b1,6b997d5..d69e42c
--- a/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
@@@ -258,8 -258,8 +258,8 @@@ public class MDKeyGenStep extends BaseS
    private boolean setStepConfiguration() {
      this.tableName = meta.getTableName();
      storeLocation = CarbonDataProcessorUtil
 -        .getLocalDataFolderLocation(meta.getSchemaName(), meta.getTableName(),
 +        .getLocalDataFolderLocation(meta.getDatabaseName(), meta.getTableName(),
-             String.valueOf(meta.getTaskNo()), meta.getPartitionID(), meta.getSegmentId()+"");
+             String.valueOf(meta.getTaskNo()), meta.getPartitionID(), meta.getSegmentId()+"", false);
      isNoDictionaryDimension =
          RemoveDictionaryUtil.convertStringToBooleanArr(meta.getNoDictionaryDimsMapping());
      isUseInvertedIndex =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStepMeta.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index cc2bd17,bb8ab2a..811a8ee
--- a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@@ -520,8 -524,8 +520,8 @@@ public class SortDataRows 
     */
    private void updateSortTempFileLocation() {
      String carbonDataDirectoryPath = CarbonDataProcessorUtil
 -        .getLocalDataFolderLocation(schemaName, tableName, taskNo, partitionID,
 +        .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID,
-             segmentId);
+             segmentId, false);
      this.tempFileLocation =
          carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
      LOGGER.info("temp file location" + this.tempFileLocation);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
index b20bcf0,b72de8e..ee6cfd0
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
@@@ -33,7 -33,9 +33,8 @@@ import java.util.concurrent.locks.Reent
  import org.carbondata.common.logging.LogService;
  import org.carbondata.common.logging.LogServiceFactory;
  import org.carbondata.core.cache.dictionary.Dictionary;
+ import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
  import org.carbondata.core.constants.CarbonCommonConstants;
 -import org.carbondata.core.file.manager.composite.IFileManagerComposite;
  import org.carbondata.core.keygenerator.KeyGenException;
  import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
  import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index fd268ab,c2093ad..003b629
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@@ -48,8 -48,12 +48,9 @@@ import org.carbondata.core.cache.dictio
  import org.carbondata.core.carbon.metadata.CarbonMetadata;
  import org.carbondata.core.carbon.metadata.datatype.DataType;
  import org.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+ import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
  import org.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
  import org.carbondata.core.constants.CarbonCommonConstants;
 -import org.carbondata.core.file.manager.composite.FileData;
 -import org.carbondata.core.file.manager.composite.FileManager;
 -import org.carbondata.core.file.manager.composite.IFileManagerComposite;
  import org.carbondata.core.keygenerator.KeyGenerator;
  import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
  import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@@ -702,8 -706,8 +704,8 @@@ public class CarbonCSVBasedSeqGenStep e
     */
    private void updateStoreLocation() {
      loadFolderLoc = CarbonDataProcessorUtil
 -        .getLocalDataFolderLocation(meta.getSchemaName(), meta.getTableName(), meta.getTaskNo(),
 +        .getLocalDataFolderLocation(meta.getDatabaseName(), meta.getTableName(), meta.getTaskNo(),
-             meta.getPartitionID(), meta.getSegmentId()+"");
+             meta.getPartitionID(), meta.getSegmentId()+"", false);
    }
  
    private String getBadLogStoreLocation(String storeLocation) {
@@@ -1824,5 -1836,45 +1834,45 @@@
      }
    }
  
+   private CarbonDimension[] populateNameToCarbonDimensionMap() {
+     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
 -        meta.getSchemaName() + CarbonCommonConstants.UNDERSCORE + meta.getTableName());
++        meta.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + meta.getTableName());
+     List<CarbonDimension> dimensionsList = carbonTable.getDimensionByTableName(meta.getTableName());
+     CarbonDimension[] dimensionOrdinalToDimensionMapping =
+         new CarbonDimension[meta.getColumnSchemaDetailsWrapper().getColumnSchemaDetailsMap()
+             .size()];
+     List<CarbonDimension> dimListExcludingNoDictionaryColumn = dimensionsList;
+     if (null != meta.getNoDictionaryDims() && meta.getNoDictionaryDims().length() > 0) {
+       dimListExcludingNoDictionaryColumn =
+           new ArrayList<>(dimensionsList.size() - meta.noDictionaryCols.length);
+       for (CarbonDimension dimension : dimensionsList) {
+         if (!dimension.getEncoder().isEmpty()) {
+           dimListExcludingNoDictionaryColumn.add(dimension);
+         }
+       }
+     }
+     for (int i = 0; i < dimListExcludingNoDictionaryColumn.size(); i++) {
+       CarbonDimension dimension = dimListExcludingNoDictionaryColumn.get(meta.memberMapping[i]);
+       if (dimension.isComplex()) {
+         populateComplexDimension(dimensionOrdinalToDimensionMapping, dimension);
+       } else {
+         dimensionOrdinalToDimensionMapping[meta.memberMapping[i]] = dimension;
+       }
+     }
+     return dimensionOrdinalToDimensionMapping;
+   }
+ 
+   private void populateComplexDimension(CarbonDimension[] dimensionOrdinalToDimensionMapping,
+       CarbonDimension dimension) {
+     List<CarbonDimension> listOfChildDimensions = dimension.getListOfChildDimensions();
+     for (CarbonDimension childDimension : listOfChildDimensions) {
+       if (childDimension.isComplex()) {
+         populateComplexDimension(dimensionOrdinalToDimensionMapping, childDimension);
+       } else {
+         dimensionOrdinalToDimensionMapping[childDimension.getOrdinal()] = childDimension;
+       }
+     }
+   }
+ 
  }
  

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/util/CarbonSchemaParser.java
----------------------------------------------------------------------