You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:40 UTC
[42/47] incubator-carbondata git commit: Merge remote-tracking branch
'carbon_master/master' into apache/master
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/lcm/locks/ZooKeeperLocking.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/lcm/locks/ZooKeeperLocking.java
index d0b0453,0000000..82fac7b
mode 100644,000000..100644
--- a/processing/src/main/java/org/carbondata/lcm/locks/ZooKeeperLocking.java
+++ b/processing/src/main/java/org/carbondata/lcm/locks/ZooKeeperLocking.java
@@@ -1,168 -1,0 +1,195 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.lcm.locks;
+
++import java.io.File;
+import java.util.Collections;
+import java.util.List;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.CarbonTableIdentifier;
+import org.carbondata.core.constants.CarbonCommonConstants;
++import org.carbondata.core.util.CarbonProperties;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * For Handling the zookeeper locking implementation
+ */
+public class ZooKeeperLocking extends AbstractCarbonLock {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(ZooKeeperLocking.class.getName());
+
+ /**
+ * zk is the zookeeper client instance
+ */
+ private static ZooKeeper zk;
+
+ /**
+ * zooKeeperLocation is the location in the zoo keeper file system where the locks will be
+ * maintained.
+ */
+ private static final String zooKeeperLocation = CarbonCommonConstants.ZOOKEEPER_LOCATION;
+
+ /**
+ * Unique folder for each table with DatabaseName_TableName
+ */
+ private final String tableIdFolder;
+
+ /**
+ * lockName is the name of the lock to use. This name should be same for every process that want
+ * to share the same lock
+ */
+ private String lockName;
+
+ /**
+ * lockPath is the unique path created for the each instance of the carbon lock.
+ */
+ private String lockPath;
+
+ private String lockTypeFolder;
+
++ public ZooKeeperLocking(CarbonTableIdentifier tableIdentifier, String lockFile) {
++ this(tableIdentifier.getDatabaseName() + File.separator + tableIdentifier.getTableName(),
++ lockFile);
++ }
++
+ /**
- * @param tableIdentifier
++ * @param lockLocation
+ * @param lockFile
+ */
- public ZooKeeperLocking(CarbonTableIdentifier tableIdentifier, String lockFile) {
++ public ZooKeeperLocking(String lockLocation, String lockFile) {
+ this.lockName = lockFile;
- this.tableIdFolder =
- zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName()
- + '.' + tableIdentifier.getTableName();
++ this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation;
+
- zk = ZookeeperInit.getInstance().getZookeeper();
++ String zooKeeperUrl =
++ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ZOOKEEPER_URL);
++ zk = ZookeeperInit.getInstance(zooKeeperUrl).getZookeeper();
+
- this.lockTypeFolder =
- zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName()
- + '.' + tableIdentifier.getTableName() + CarbonCommonConstants.FILE_SEPARATOR
- + lockFile;
++ this.lockTypeFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation
++ + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
+ try {
+ createBaseNode();
+ // if exists returns null then path doesnt exist. so creating.
+ if (null == zk.exists(this.tableIdFolder, true)) {
- zk.create(this.tableIdFolder, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
++ createRecursivly(this.tableIdFolder);
+ }
+ // if exists returns null then path doesnt exist. so creating.
+ if (null == zk.exists(this.lockTypeFolder, true)) {
+ zk.create(this.lockTypeFolder, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (KeeperException | InterruptedException e) {
+ LOGGER.error(e, e.getMessage());
+ }
+ initRetry();
+ }
+
+ /**
+ * Creating a znode in which all the znodes (lock files )are maintained.
+ */
+ private void createBaseNode() throws KeeperException, InterruptedException {
+ if (null == zk.exists(zooKeeperLocation, true)) {
+ // creating a znode in which all the znodes (lock files )are maintained.
+ zk.create(zooKeeperLocation, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ }
+
+ /**
++ * Create zookeepr node if not exist
++ * @param path
++ * @throws KeeperException
++ * @throws InterruptedException
++ */
++ private void createRecursivly(String path) throws KeeperException, InterruptedException {
++ try {
++ if (zk.exists(path, true) == null && path.length() > 0) {
++ String temp = path.substring(0, path.lastIndexOf(File.separator));
++ createRecursivly(temp);
++ zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
++ } else {
++ return;
++ }
++ } catch (KeeperException e) {
++ throw e;
++ } catch (InterruptedException e) {
++ throw e;
++ }
++
++ }
++ /**
+ * Handling of the locking mechanism using zoo keeper.
+ */
+ @Override public boolean lock() {
+ try {
+ // create the lock file with lockName.
+ lockPath =
+ zk.create(this.lockTypeFolder + CarbonCommonConstants.FILE_SEPARATOR + lockName, null,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+ // get the children present in zooKeeperLocation.
+ List<String> nodes = zk.getChildren(this.lockTypeFolder, null);
+
+ // sort the childrens
+ Collections.sort(nodes);
+
+ // here the logic is , for each lock request zookeeper will create a file ending with
+ // incremental digits.
+ // so first request will be 00001 next is 00002 and so on.
+ // if the current request is 00002 and already one previous request(00001) is present then get
+ // children will give both nodes.
+ // after the sort we are checking if the lock path is first or not .if it is first then lock
+ // has been acquired.
+
+ if (lockPath.endsWith(nodes.get(0))) {
+ return true;
+ } else {
+ // if locking failed then deleting the created lock as next time again new lock file will be
+ // created.
+ zk.delete(lockPath, -1);
+ return false;
+ }
+ } catch (KeeperException | InterruptedException e) {
+ LOGGER.error(e, e.getMessage());
+ return false;
+ }
+ }
+
+ /**
+ * @return status where lock file is unlocked or not.
+ */
+ @Override public boolean unlock() {
+ try {
+ // exists will return null if the path doesn't exists.
+ if (null != zk.exists(lockPath, true)) {
+ zk.delete(lockPath, -1);
+ lockPath = null;
+ }
+ } catch (KeeperException | InterruptedException e) {
+ LOGGER.error(e, e.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/csvload/GraphExecutionUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
index 6f7a2b1,6b997d5..d69e42c
--- a/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
@@@ -258,8 -258,8 +258,8 @@@ public class MDKeyGenStep extends BaseS
private boolean setStepConfiguration() {
this.tableName = meta.getTableName();
storeLocation = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(meta.getSchemaName(), meta.getTableName(),
+ .getLocalDataFolderLocation(meta.getDatabaseName(), meta.getTableName(),
- String.valueOf(meta.getTaskNo()), meta.getPartitionID(), meta.getSegmentId()+"");
+ String.valueOf(meta.getTaskNo()), meta.getPartitionID(), meta.getSegmentId()+"", false);
isNoDictionaryDimension =
RemoveDictionaryUtil.convertStringToBooleanArr(meta.getNoDictionaryDimsMapping());
isUseInvertedIndex =
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStepMeta.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index cc2bd17,bb8ab2a..811a8ee
--- a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@@ -520,8 -524,8 +520,8 @@@ public class SortDataRows
*/
private void updateSortTempFileLocation() {
String carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(schemaName, tableName, taskNo, partitionID,
+ .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID,
- segmentId);
+ segmentId, false);
this.tempFileLocation =
carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
LOGGER.info("temp file location" + this.tempFileLocation);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
index b20bcf0,b72de8e..ee6cfd0
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
@@@ -33,7 -33,9 +33,8 @@@ import java.util.concurrent.locks.Reent
import org.carbondata.common.logging.LogService;
import org.carbondata.common.logging.LogServiceFactory;
import org.carbondata.core.cache.dictionary.Dictionary;
+ import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.file.manager.composite.IFileManagerComposite;
import org.carbondata.core.keygenerator.KeyGenException;
import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index fd268ab,c2093ad..003b629
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@@ -48,8 -48,12 +48,9 @@@ import org.carbondata.core.cache.dictio
import org.carbondata.core.carbon.metadata.CarbonMetadata;
import org.carbondata.core.carbon.metadata.datatype.DataType;
import org.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+ import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
import org.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.file.manager.composite.FileData;
-import org.carbondata.core.file.manager.composite.FileManager;
-import org.carbondata.core.file.manager.composite.IFileManagerComposite;
import org.carbondata.core.keygenerator.KeyGenerator;
import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@@ -702,8 -706,8 +704,8 @@@ public class CarbonCSVBasedSeqGenStep e
*/
private void updateStoreLocation() {
loadFolderLoc = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(meta.getSchemaName(), meta.getTableName(), meta.getTaskNo(),
+ .getLocalDataFolderLocation(meta.getDatabaseName(), meta.getTableName(), meta.getTaskNo(),
- meta.getPartitionID(), meta.getSegmentId()+"");
+ meta.getPartitionID(), meta.getSegmentId()+"", false);
}
private String getBadLogStoreLocation(String storeLocation) {
@@@ -1824,5 -1836,45 +1834,45 @@@
}
}
+ private CarbonDimension[] populateNameToCarbonDimensionMap() {
+ CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
- meta.getSchemaName() + CarbonCommonConstants.UNDERSCORE + meta.getTableName());
++ meta.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + meta.getTableName());
+ List<CarbonDimension> dimensionsList = carbonTable.getDimensionByTableName(meta.getTableName());
+ CarbonDimension[] dimensionOrdinalToDimensionMapping =
+ new CarbonDimension[meta.getColumnSchemaDetailsWrapper().getColumnSchemaDetailsMap()
+ .size()];
+ List<CarbonDimension> dimListExcludingNoDictionaryColumn = dimensionsList;
+ if (null != meta.getNoDictionaryDims() && meta.getNoDictionaryDims().length() > 0) {
+ dimListExcludingNoDictionaryColumn =
+ new ArrayList<>(dimensionsList.size() - meta.noDictionaryCols.length);
+ for (CarbonDimension dimension : dimensionsList) {
+ if (!dimension.getEncoder().isEmpty()) {
+ dimListExcludingNoDictionaryColumn.add(dimension);
+ }
+ }
+ }
+ for (int i = 0; i < dimListExcludingNoDictionaryColumn.size(); i++) {
+ CarbonDimension dimension = dimListExcludingNoDictionaryColumn.get(meta.memberMapping[i]);
+ if (dimension.isComplex()) {
+ populateComplexDimension(dimensionOrdinalToDimensionMapping, dimension);
+ } else {
+ dimensionOrdinalToDimensionMapping[meta.memberMapping[i]] = dimension;
+ }
+ }
+ return dimensionOrdinalToDimensionMapping;
+ }
+
+ private void populateComplexDimension(CarbonDimension[] dimensionOrdinalToDimensionMapping,
+ CarbonDimension dimension) {
+ List<CarbonDimension> listOfChildDimensions = dimension.getListOfChildDimensions();
+ for (CarbonDimension childDimension : listOfChildDimensions) {
+ if (childDimension.isComplex()) {
+ populateComplexDimension(dimensionOrdinalToDimensionMapping, childDimension);
+ } else {
+ dimensionOrdinalToDimensionMapping[childDimension.getOrdinal()] = childDimension;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/processing/src/main/java/org/carbondata/processing/util/CarbonSchemaParser.java
----------------------------------------------------------------------