You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/25 07:40:20 UTC

[1/3] incubator-kylin git commit: KYLIN-1166 CubeMigrationCLI should disable and purge the cube

Repository: incubator-kylin
Updated Branches:
  refs/heads/1.x-staging 4c4408064 -> 2b0c3a503


KYLIN-1166 CubeMigrationCLI should disable and purge the cube


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2b0c3a50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2b0c3a50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2b0c3a50

Branch: refs/heads/1.x-staging
Commit: 2b0c3a5038128bf0e80746e312cb2387e66c1f5b
Parents: 1cd1896
Author: shaofengshi <sh...@apache.org>
Authored: Wed Nov 25 14:39:35 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 25 14:40:07 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/tools/CubeMigrationCLI.java       | 47 ++++++++++++++++----
 1 file changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2b0c3a50/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
index ca756d1..962a4ee 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
@@ -18,11 +18,6 @@
 
 package org.apache.kylin.job.tools;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,7 +46,10 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jdo.identity.ObjectIdentity;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Created by honma on 9/3/14.
@@ -126,6 +124,7 @@ public class CubeMigrationCLI {
         if (Boolean.parseBoolean(copyAcl) == true) {
             copyACL(cube);
         }
+        purgeAndDisable(cubeName); // this should be the last action
 
         if (realExecute.equalsIgnoreCase("true")) {
             doOpts();
@@ -178,7 +177,7 @@ public class CubeMigrationCLI {
             operations.add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() }));
         }
     }
-    
+
     private static void copyACL(CubeInstance cube) {
         operations.add(new Opt(OptType.COPY_ACL, new Object[] { cube.getUuid() }));
     }
@@ -209,6 +208,11 @@ public class CubeMigrationCLI {
         operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] { cubeName, projectName }));
     }
 
+
+    private static void purgeAndDisable(String cubeName) throws IOException {
+        operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] { cubeName }));
+    }
+
     private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, List<String> dictAndSnapshot) throws IOException {
 
         CubeDesc cubeDesc = cube.getDescriptor();
@@ -227,7 +231,7 @@ public class CubeMigrationCLI {
     }
 
     private static enum OptType {
-        COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, COPY_ACL
+        COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, COPY_ACL, PURGE_AND_DISABLE
     }
 
     private static class Opt {
@@ -418,6 +422,16 @@ public class CubeMigrationCLI {
             }
             break;
         }
+        case PURGE_AND_DISABLE:{
+            String cubeName = (String) opt.params[0];
+            String cubeResPath = CubeInstance.concatResourcePath(cubeName);
+            Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
+            CubeInstance cube = srcStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer);
+            cube.getSegments().clear();
+            cube.setStatus(RealizationStatusEnum.DISABLED);
+            srcStore.putResource(cubeResPath, cube, cubeSerializer);
+            logger.info("Cube " + cubeName + " is purged and disabled in " + srcConfig.getMetadataUrl());
+        }
         }
     }
 
@@ -458,6 +472,23 @@ public class CubeMigrationCLI {
             logger.info("Undo for ADD_INTO_PROJECT is ignored");
             break;
         }
+        case COPY_ACL: {
+            String cubeId = (String) opt.params[0];
+            HTableInterface destAclHtable = null;
+            try {
+                destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(dstConfig.getMetadataUrlPrefix() + "_acl");
+
+                destAclHtable.delete(new Delete(Bytes.toBytes(cubeId)));
+                destAclHtable.flushCommits();
+            } finally {
+                IOUtils.closeQuietly(destAclHtable);
+            }
+            break;
+        }
+        case PURGE_AND_DISABLE: {
+            logger.info("Undo for PURGE_AND_DISABLE is not supported");
+            break;
+        }
         }
     }
 }


[2/3] incubator-kylin git commit: KYLIN-1119 enhance find-hive-dependency to support one common case

Posted by sh...@apache.org.
KYLIN-1119 enhance find-hive-dependency to support one common case


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1cd18964
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1cd18964
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1cd18964

Branch: refs/heads/1.x-staging
Commit: 1cd189649ad6d559c0d373b1e2b31c55387aa13e
Parents: 0f8fc23
Author: shaofengshi <sh...@apache.org>
Authored: Wed Nov 25 14:38:55 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 25 14:40:07 2015 +0800

----------------------------------------------------------------------
 bin/find-hive-dependency.sh | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1cd18964/bin/find-hive-dependency.sh
----------------------------------------------------------------------
diff --git a/bin/find-hive-dependency.sh b/bin/find-hive-dependency.sh
index cb5a43e..5994dda 100755
--- a/bin/find-hive-dependency.sh
+++ b/bin/find-hive-dependency.sh
@@ -44,6 +44,8 @@ then
     hadoop_home=`echo $hive_exec_path | awk -F '/hive.*/lib/' '{print $1}'`
     if [ -d "${hadoop_home}/hive-hcatalog" ]; then
       hcatalog_home=${hadoop_home}/hive-hcatalog
+    elif [ -d "${hadoop_home}/hive/hcatalog" ]; then
+      hcatalog_home=${hadoop_home}/hive/hcatalog
     else 
       echo "Couldn't locate hcatalog installation, please make sure it is installed and set HCAT_HOME to the path."
       exit 1


[3/3] incubator-kylin git commit: KYLIN-980 spill to disk when sys available memory is low

Posted by sh...@apache.org.
KYLIN-980 spill to disk when sys available memory is low


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0f8fc239
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0f8fc239
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0f8fc239

Branch: refs/heads/1.x-staging
Commit: 0f8fc239162bbca913b1eceb380d89f674928400
Parents: 4c44080
Author: shaofengshi <sh...@apache.org>
Authored: Wed Nov 25 14:38:14 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 25 14:40:07 2015 +0800

----------------------------------------------------------------------
 .../common/util/MemoryBudgetController.java     | 249 +++++++++++++++++++
 .../hadoop/cube/FactDistinctColumnsReducer.java |  33 ++-
 2 files changed, 274 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8fc239/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
new file mode 100644
index 0000000..4715ef6
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -0,0 +1,249 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class MemoryBudgetController {
+
+    private static final boolean debug = true;
+
+    public interface MemoryConsumer {
+        // return number MB released
+        int freeUp(int mb);
+    }
+
+    @SuppressWarnings("serial")
+    public static class NotEnoughBudgetException extends IllegalStateException {
+
+        public NotEnoughBudgetException() {
+            super();
+        }
+
+        public NotEnoughBudgetException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    private static class ConsumerEntry {
+        final MemoryConsumer consumer;
+        int reservedMB;
+
+        ConsumerEntry(MemoryConsumer consumer) {
+            this.consumer = consumer;
+        }
+    }
+
+    public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
+    public static final int ONE_MB = 1024 * 1024;
+    public static final long ONE_GB = 1024 * 1024 * 1024;
+
+    private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
+
+    // all budget numbers are in MB
+    private final int totalBudgetMB;
+    private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+    private int totalReservedMB;
+    private final ReentrantLock lock = new ReentrantLock();
+
+    public MemoryBudgetController(int totalBudgetMB) {
+        Preconditions.checkArgument(totalBudgetMB >= 0);
+        Preconditions.checkState(totalBudgetMB <= getSystemAvailMB());
+        this.totalBudgetMB = totalBudgetMB;
+        this.totalReservedMB = 0;
+    }
+
+    public int getTotalBudgetMB() {
+        return totalBudgetMB;
+    }
+
+    public int getTotalReservedMB() {
+        lock.lock();
+        try {
+            return totalReservedMB;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int getRemainingBudgetMB() {
+        lock.lock();
+        try {
+            return totalBudgetMB - totalReservedMB;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void reserveInsist(MemoryConsumer consumer, int requestMB) {
+        long waitStart = 0;
+        while (true) {
+            try {
+                reserve(consumer, requestMB);
+                if (debug && waitStart > 0)
+                    logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request");
+                return;
+            } catch (NotEnoughBudgetException ex) {
+                // retry
+            }
+
+            if (waitStart == 0)
+                waitStart = System.currentTimeMillis();
+
+            synchronized (lock) {
+                try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                    throw new NotEnoughBudgetException(e);
+                }
+            }
+        }
+    }
+
+    /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */
+    public void reserve(MemoryConsumer consumer, int requestMB) {
+        if (totalBudgetMB == 0 && requestMB > 0)
+            throw new NotEnoughBudgetException();
+
+        boolean ok = false;
+        while (!ok) {
+            int gap = calculateGap(consumer, requestMB);
+            if (gap > 0) {
+                // to void deadlock, don't hold lock when invoking consumer.freeUp()
+                tryFreeUp(gap);
+            }
+            ok = updateBooking(consumer, requestMB);
+        }
+    }
+
+    private int calculateGap(MemoryConsumer consumer, int requestMB) {
+        lock.lock();
+        try {
+            ConsumerEntry entry = booking.get(consumer);
+            int curMB = entry == null ? 0 : entry.reservedMB;
+            int delta = requestMB - curMB;
+            return delta - (totalBudgetMB - totalReservedMB);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void tryFreeUp(int gap) {
+        // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock
+        for (ConsumerEntry entry : booking.values()) {
+            int mb = entry.consumer.freeUp(gap);
+            if (mb > 0) {
+                lock.lock();
+                try {
+                    updateBookingWithDelta(entry.consumer, -mb);
+                } finally {
+                    lock.unlock();
+                }
+                gap -= mb;
+                if (gap <= 0)
+                    break;
+            }
+        }
+        if (gap > 0)
+            throw new NotEnoughBudgetException();
+
+        if (debug) {
+            if (getSystemAvailMB() < getRemainingBudgetMB()) {
+                logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+            }
+        }
+    }
+
+    private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
+        lock.lock();
+        try {
+            ConsumerEntry entry = booking.get(consumer);
+            if (entry == null) {
+                if (requestMB == 0)
+                    return true;
+
+                entry = new ConsumerEntry(consumer);
+                booking.put(consumer, entry);
+            }
+
+            int delta = requestMB - entry.reservedMB;
+            return updateBookingWithDelta(consumer, delta);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    // lock MUST be obtained before entering
+    private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) {
+        if (delta == 0)
+            return true;
+
+        ConsumerEntry entry = booking.get(consumer);
+        if (entry == null) {
+            if (delta <= 0)
+                return true;
+
+            entry = new ConsumerEntry(consumer);
+            booking.put(consumer, entry);
+        }
+
+        // double check gap again, it may be changed by other concurrent requests
+        if (delta > 0) {
+            int gap = delta - (totalBudgetMB - totalReservedMB);
+            if (gap > 0)
+                return false;
+        }
+
+        totalReservedMB += delta;
+        entry.reservedMB += delta;
+        if (entry.reservedMB == 0) {
+            booking.remove(entry.consumer);
+        }
+        if (debug) {
+            logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+        }
+
+        if (delta < 0) {
+            synchronized (lock) {
+                lock.notifyAll();
+            }
+        }
+
+        return true;
+    }
+
+    public static long getSystemAvailBytes() {
+        Runtime runtime = Runtime.getRuntime();
+        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+        return availableMemory;
+    }
+
+    public static int getSystemAvailMB() {
+        return (int) (getSystemAvailBytes() / ONE_MB);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8fc239/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index 89f90ba..f18e840 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -29,6 +29,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.mr.KylinReducer;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -38,9 +39,7 @@ import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
+import java.util.*;
 
 /**
  * @author yangli9
@@ -69,15 +68,34 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
         TblColRef col = columnList.get(key.get());
 
         HashSet<ByteArray> set = new HashSet<ByteArray>();
+        int count = 0;
         for (Text textValue : values) {
             ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
             set.add(value);
+            count++;
+            if (count % 10000 == 0 && MemoryBudgetController.getSystemAvailMB() < 100) {
+                outputDistinctValues(col, set, context);
+                set.clear();
+            }
         }
 
-        Configuration conf = context.getConfiguration();
-        FileSystem fs = FileSystem.get(conf);
-        String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
-        FSDataOutputStream out = fs.create(new Path(outputPath, col.getName()));
+        if (set.isEmpty() == false) {
+            outputDistinctValues(col, set, context);
+        }
+
+    }
+
+    private void outputDistinctValues(TblColRef col, Set<ByteArray> set, Context context) throws IOException {
+        final Configuration conf = context.getConfiguration();
+        final FileSystem fs = FileSystem.get(conf);
+        final String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+        final Path outputFile = new Path(outputPath, col.getName());
+        FSDataOutputStream out;
+        if (fs.exists(outputFile)) {
+            out = fs.append(outputFile);
+        } else {
+            out = fs.create(outputFile);
+        }
 
         try {
             for (ByteArray value : set) {
@@ -87,7 +105,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
         } finally {
             out.close();
         }
-
     }
 
 }