You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/03/26 11:34:57 UTC

[02/13] drill git commit: DRILL-6248: Added limit push down support for system tables

DRILL-6248: Added limit push down support for system tables

1. PojoRecordReader started returning data in batches instead of returing all in one batch. Default batch size is 4000.
2. SystemTableScan supports limit push down while extrating data in record reader based on given max records to read.
3. Profiles and profiles_json tables apply limit push down while extracting data from store accessing profiles by range.

closes #1183


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8663e8a5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8663e8a5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8663e8a5

Branch: refs/heads/master
Commit: 8663e8a56caae10623c7135292d00df49fddf419
Parents: 3595664
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Tue Mar 20 20:26:42 2018 +0200
Committer: Vitalii Diravka <vi...@gmail.com>
Committed: Sat Mar 24 20:35:32 2018 +0200

----------------------------------------------------------------------
 .../exec/physical/base/AbstractGroupScan.java   |  13 +--
 .../drill/exec/physical/base/GroupScan.java     |  38 +++---
 .../exec/store/parquet/ParquetGroupScan.java    |   2 +-
 .../store/pojo/AbstractPojoRecordReader.java    |  12 +-
 .../store/pojo/DynamicPojoRecordReader.java     |  21 +++-
 .../drill/exec/store/pojo/PojoRecordReader.java |   6 +
 .../exec/store/sys/ProfileInfoIterator.java     |  33 +++---
 .../drill/exec/store/sys/ProfileIterator.java   | 116 ++++++++++++++-----
 .../exec/store/sys/ProfileJsonIterator.java     |  34 +++---
 .../drill/exec/store/sys/SystemTable.java       |  36 +++---
 .../exec/store/sys/SystemTableBatchCreator.java |   4 +-
 .../drill/exec/store/sys/SystemTablePlugin.java |  10 +-
 .../drill/exec/store/sys/SystemTableScan.java   |  69 ++++++++---
 .../drill/exec/store/sys/TestSystemTable.java   |  16 ++-
 14 files changed, 265 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index ac42766..b2ddf68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -30,10 +30,8 @@ import org.apache.drill.exec.planner.fragment.DistributionAffinity;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.collect.Iterators;
 
 public abstract class AbstractGroupScan extends AbstractBase implements GroupScan {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class);
 
   public AbstractGroupScan(String userName) {
     super(userName);
@@ -45,7 +43,7 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
 
   @Override
   public Iterator<PhysicalOperator> iterator() {
-    return Iterators.emptyIterator();
+    return Collections.emptyIterator();
   }
 
   @Override
@@ -135,7 +133,6 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
 
   /**
    * Default is not to support limit pushdown.
-   * @return
    */
   @Override
   @JsonIgnore
@@ -144,12 +141,12 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
   }
 
   /**
-   * By default, return null to indicate rowcount based prune is not supported.
-   * Each groupscan subclass should override, if it supports rowcount based prune.
+   * By default, return null to indicate row count based prune is not supported.
+   * Each group scan subclass should override, if it supports row count based prune.
    */
   @Override
   @JsonIgnore
-  public GroupScan applyLimit(long maxRecords) {
+  public GroupScan applyLimit(int maxRecords) {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index d42680a..fc63c77 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -40,16 +40,16 @@ public interface GroupScan extends Scan, HasAffinity{
    *                             2) NULL is interpreted as ALL_COLUMNS.
    *  How to handle skipAll query is up to each storage plugin, with different policy in corresponding RecordReader.
    */
-  public static final List<SchemaPath> ALL_COLUMNS = ImmutableList.of(SchemaPath.STAR_COLUMN);
+  List<SchemaPath> ALL_COLUMNS = ImmutableList.of(SchemaPath.STAR_COLUMN);
 
-  public static final long NO_COLUMN_STATS = -1;
+  long NO_COLUMN_STATS = -1;
 
-  public abstract void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException;
+  void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException;
 
-  public abstract SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException;
+  SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException;
 
   @JsonIgnore
-  public int getMaxParallelizationWidth();
+  int getMaxParallelizationWidth();
 
   /**
    * At minimum, the GroupScan requires these many fragments to run.
@@ -57,7 +57,7 @@ public interface GroupScan extends Scan, HasAffinity{
    * @return the minimum number of fragments that should run
    */
   @JsonIgnore
-  public int getMinParallelizationWidth();
+  int getMinParallelizationWidth();
 
   /**
    * Check if GroupScan enforces width to be maximum parallelization width.
@@ -69,50 +69,50 @@ public interface GroupScan extends Scan, HasAffinity{
    */
   @JsonIgnore
   @Deprecated
-  public boolean enforceWidth();
+  boolean enforceWidth();
 
   /**
    * Returns a signature of the {@link GroupScan} which should usually be composed of
    * all its attributes which could describe it uniquely.
    */
   @JsonIgnore
-  public abstract String getDigest();
+  String getDigest();
 
   @JsonIgnore
-  public ScanStats getScanStats(PlannerSettings settings);
+  ScanStats getScanStats(PlannerSettings settings);
 
   /**
    * Returns a clone of GroupScan instance, except that the new GroupScan will use the provided list of columns .
    */
-  public GroupScan clone(List<SchemaPath> columns);
+  GroupScan clone(List<SchemaPath> columns);
 
   /**
    * GroupScan should check the list of columns, and see if it could support all the columns in the list.
    */
-  public boolean canPushdownProjects(List<SchemaPath> columns);
+  boolean canPushdownProjects(List<SchemaPath> columns);
 
   /**
    * Return the number of non-null value in the specified column. Raise exception, if groupscan does not
    * have exact column row count.
    */
-  public long getColumnValueCount(SchemaPath column);
+  long getColumnValueCount(SchemaPath column);
 
   /**
    * Whether or not this GroupScan supports pushdown of partition filters (directories for filesystems)
    */
-  public boolean supportsPartitionFilterPushdown();
+  boolean supportsPartitionFilterPushdown();
 
   /**
    * Returns a list of columns that can be used for partition pruning
    *
    */
   @JsonIgnore
-  public List<SchemaPath> getPartitionColumns();
+  List<SchemaPath> getPartitionColumns();
 
   /**
    * Whether or not this GroupScan supports limit pushdown
    */
-  public boolean supportsLimitPushdown();
+  boolean supportsLimitPushdown();
 
   /**
    * Apply rowcount based prune for "LIMIT n" query.
@@ -120,18 +120,18 @@ public interface GroupScan extends Scan, HasAffinity{
    * @return  a new instance of group scan if the prune is successful.
    *          null when either if row-based prune is not supported, or if prune is not successful.
    */
-  public GroupScan applyLimit(long maxRecords);
+  GroupScan applyLimit(int maxRecords);
 
   /**
    * Return true if this GroupScan can return its selection as a list of file names (retrieved by getFiles()).
    */
   @JsonIgnore
-  public boolean hasFiles();
+  boolean hasFiles();
 
   /**
    * Returns a collection of file names associated with this GroupScan. This should be called after checking
    * hasFiles().  If this GroupScan cannot provide file names, it returns null.
    */
-  public Collection<String> getFiles();
+  Collection<String> getFiles();
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index aa001f9..21dab5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -1189,7 +1189,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   }
 
   @Override
-  public GroupScan applyLimit(long maxRecords) {
+  public GroupScan applyLimit(int maxRecords) {
     Preconditions.checkArgument(rowGroupInfos.size() >= 0);
 
     maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup.

http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
index c2f213d..a85d7fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
@@ -41,15 +41,23 @@ public abstract class AbstractPojoRecordReader<T> extends AbstractRecordReader i
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPojoRecordReader.class);
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(AbstractPojoRecordReader.class);
+  public static final int DEFAULT_RECORDS_PER_BATCH = 4000;
 
+  @JsonProperty private final int recordsPerBatch;
   @JsonProperty protected final List<T> records;
+
   protected List<PojoWriter> writers;
 
   private Iterator<T> currentIterator;
   private OperatorContext operatorContext;
 
   protected AbstractPojoRecordReader(List<T> records) {
+    this(records, DEFAULT_RECORDS_PER_BATCH);
+  }
+
+  protected AbstractPojoRecordReader(List<T> records, int recordsPerBatch) {
     this.records = records;
+    this.recordsPerBatch = Math.min(recordsPerBatch, DEFAULT_RECORDS_PER_BATCH);
   }
 
   @Override
@@ -65,7 +73,7 @@ public abstract class AbstractPojoRecordReader<T> extends AbstractRecordReader i
     injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger);
 
     int recordCount = 0;
-    while (currentIterator.hasNext()) {
+    while (currentIterator.hasNext() && recordCount < recordsPerBatch) {
       if (!allocated) {
         allocate();
         allocated = true;
@@ -88,7 +96,7 @@ public abstract class AbstractPojoRecordReader<T> extends AbstractRecordReader i
   @Override
   public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
     for (final ValueVector v : vectorMap.values()) {
-      AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
+      AllocationHelper.allocate(v, recordsPerBatch, 50, 10);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
index 167c721..fef4556 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
@@ -45,12 +45,16 @@ import java.util.Map;
 public class DynamicPojoRecordReader<T> extends AbstractPojoRecordReader<List<T>> {
 
   @JsonProperty
-  private final LinkedHashMap<String, Class<?>> schema;
+  private LinkedHashMap<String, Class<?>> schema;
 
   public DynamicPojoRecordReader(LinkedHashMap<String, Class<?>> schema, List<List<T>> records) {
     super(records);
-    Preconditions.checkState(schema != null && !schema.isEmpty(), "Undefined schema is not allowed.");
-    this.schema = schema;
+    validateAndSetSchema(schema);
+  }
+
+  public DynamicPojoRecordReader(LinkedHashMap<String, Class<?>> schema, List<List<T>> records, int maxRecordsToRead) {
+    super(records, maxRecordsToRead);
+    validateAndSetSchema(schema);
   }
 
   /**
@@ -80,12 +84,16 @@ public class DynamicPojoRecordReader<T> extends AbstractPojoRecordReader<List<T>
         "}";
   }
 
+  private void validateAndSetSchema(LinkedHashMap<String, Class<?>> schema) {
+    Preconditions.checkState(schema != null && !schema.isEmpty(), "Undefined schema is not allowed.");
+    this.schema = schema;
+  }
+
   /**
    * An utility class that converts from {@link com.fasterxml.jackson.databind.JsonNode}
    * to DynamicPojoRecordReader during physical plan fragment deserialization.
    */
-  public static class Converter extends StdConverter<JsonNode, DynamicPojoRecordReader>
-  {
+  public static class Converter extends StdConverter<JsonNode, DynamicPojoRecordReader> {
     private static final TypeReference<LinkedHashMap<String, Class<?>>> schemaType =
         new TypeReference<LinkedHashMap<String, Class<?>>>() {};
 
@@ -105,7 +113,8 @@ public class DynamicPojoRecordReader<T> extends AbstractPojoRecordReader<List<T>
       for (Class<?> fieldType : schema.values()) {
         records.add(mapper.convertValue(recordsIterator.next(), fieldType));
       }
-      return new DynamicPojoRecordReader(schema, Collections.singletonList(records));
+      int maxRecordsToRead = value.get("recordsPerBatch").asInt();
+      return new DynamicPojoRecordReader(schema, Collections.singletonList(records), maxRecordsToRead);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index c3b6883..3546c73 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -43,6 +43,12 @@ public class PojoRecordReader<T> extends AbstractPojoRecordReader<T> {
     this.fields = new ArrayList<>();
   }
 
+  public PojoRecordReader(Class<T> pojoClass, List<T> records, int maxRecordToRead) {
+    super(records, maxRecordToRead);
+    this.pojoClass = pojoClass;
+    this.fields = new ArrayList<>();
+  }
+
   /**
    * Creates writers based on pojo field class types. Ignores static fields.
    *

http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java
index eef6604..09cc715 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -37,24 +37,25 @@ public class ProfileInfoIterator extends ProfileIterator {
 
   private final Iterator<ProfileInfo> itr;
 
-  public ProfileInfoIterator(ExecutorFragmentContext context) {
-    super(context);
-    itr = iterateProfileInfo();
+  public ProfileInfoIterator(ExecutorFragmentContext context, int maxRecords) {
+    super(context, maxRecords);
+    this.itr = iterateProfileInfo();
+  }
+
+  @Override
+  protected Iterator<Entry<String, QueryProfile>> getProfiles(int skip, int take) {
+    return profileStoreContext
+      .getCompletedProfileStore()
+      .getRange(skip, take);
   }
 
   //Returns an iterator for authorized profiles
   private Iterator<ProfileInfo> iterateProfileInfo() {
     try {
       //Transform authorized profiles to iterator for ProfileInfo
-      return transform(
-          getAuthorizedProfiles(
-            profileStoreContext
-              .getCompletedProfileStore()
-              .getAll(),
-            queryingUsername, isAdmin));
-
+      return transform(getAuthorizedProfiles(queryingUsername, isAdmin));
     } catch (Exception e) {
-      logger.error(e.getMessage());
+      logger.error(e.getMessage(), e);
       return Iterators.singletonIterator(ProfileInfo.getDefault());
     }
   }
@@ -109,7 +110,7 @@ public class ProfileInfoIterator extends ProfileIterator {
   }
 
   public static class ProfileInfo {
-    private static final String UnknownValue = "N/A";
+    private static final String UNKNOWN_VALUE = "N/A";
 
     private static final ProfileInfo DEFAULT = new ProfileInfo();
 
@@ -144,14 +145,16 @@ public class ProfileInfoIterator extends ProfileIterator {
     }
 
     private ProfileInfo() {
-      this(UnknownValue, new Timestamp(0), UnknownValue, 0L, UnknownValue, UnknownValue, 0L, 0L, 0L, UnknownValue, UnknownValue);
+      this(UNKNOWN_VALUE, new Timestamp(0), UNKNOWN_VALUE, 0L,
+          UNKNOWN_VALUE, UNKNOWN_VALUE, 0L, 0L,
+          0L, UNKNOWN_VALUE, UNKNOWN_VALUE);
     }
 
     /**
      * If unable to get ProfileInfo, use this default instance instead.
      * @return the default instance
      */
-    public static final ProfileInfo getDefault() {
+    public static ProfileInfo getDefault() {
       return DEFAULT;
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java
index 6112eb1..78d46d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.sys;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -33,47 +34,76 @@ import org.apache.drill.exec.util.ImpersonationUtil;
  * Base class for Profile Iterators
  */
 public abstract class ProfileIterator implements Iterator<Object> {
+
+  private static final int DEFAULT_NUMBER_OF_PROFILES_TO_FETCH = 4000;
+
   protected final QueryProfileStoreContext profileStoreContext;
   protected final String queryingUsername;
   protected final boolean isAdmin;
 
-  public ProfileIterator(ExecutorFragmentContext context) {
-    //Grab profile Store Context
-    profileStoreContext = context.getProfileStoreContext();
+  private final int maxRecords;
 
-    queryingUsername = context.getQueryUserName();
-    isAdmin = hasAdminPrivileges(context);
+  protected ProfileIterator(ExecutorFragmentContext context, int maxRecords) {
+    this.profileStoreContext = context.getProfileStoreContext();
+    this.queryingUsername = context.getQueryUserName();
+    this.isAdmin = hasAdminPrivileges(context);
+    this.maxRecords = maxRecords;
   }
 
-  protected boolean hasAdminPrivileges(ExecutorFragmentContext context) {
-    OptionManager options = context.getOptions();
-    if (context.isUserAuthenticationEnabled() &&
-        !ImpersonationUtil.hasAdminPrivileges(
-          context.getQueryUserName(),
-          ExecConstants.ADMIN_USERS_VALIDATOR.getAdminUsers(options),
-          ExecConstants.ADMIN_USER_GROUPS_VALIDATOR.getAdminUserGroups(options))) {
-      return false;
+  //Returns an iterator for authorized profiles
+  protected Iterator<Entry<String, QueryProfile>> getAuthorizedProfiles(String username, boolean isAdministrator) {
+    if (maxRecords == 0) {
+      return Collections.emptyIterator();
     }
 
-    //Passed checks
-    return true;
-  }
-
-  //Returns an iterator for authorized profiles
-  protected Iterator<Entry<String, QueryProfile>> getAuthorizedProfiles (
-      Iterator<Entry<String, QueryProfile>> allProfiles, String username, boolean isAdministrator) {
     if (isAdministrator) {
-      return allProfiles;
+      return getProfiles(0, maxRecords);
     }
 
-    List<Entry<String, QueryProfile>> authorizedProfiles = new LinkedList<Entry<String, QueryProfile>>();
-    while (allProfiles.hasNext()) {
-      Entry<String, QueryProfile> profileKVPair = allProfiles.next();
-      //Check if user matches
-      if (profileKVPair.getValue().getUser().equals(username)) {
-        authorizedProfiles.add(profileKVPair);
+
+    /*
+      For non-administrators getting profiles by range might not return exact number of requested profiles
+      since some extracted profiles may belong to different users.
+      In order not to extract all profiles, proceed extracting profiles based on max between default and given maxRecords
+      until number of authorized user's profiles equals to the provided limit.
+      Using max between default and given maxRecords will be helpful in case maxRecords number is low (ex: 1).
+      Reading profiles in a bulk will be much faster.
+     */
+    List<Entry<String, QueryProfile>> authorizedProfiles = new LinkedList<>();
+    int skip = 0;
+    int take = Math.max(maxRecords, DEFAULT_NUMBER_OF_PROFILES_TO_FETCH);
+
+    while (skip < Integer.MAX_VALUE) {
+      Iterator<Entry<String, QueryProfile>> profiles = getProfiles(skip, take);
+      int fetchedProfilesCount = 0;
+      while (profiles.hasNext()) {
+        fetchedProfilesCount++;
+        Entry<String, QueryProfile> profileKVPair = profiles.next();
+        // check if user matches
+        if (profileKVPair.getValue().getUser().equals(username)) {
+          authorizedProfiles.add(profileKVPair);
+          // if we have all needed authorized profiles, return iterator
+          if (authorizedProfiles.size() == maxRecords) {
+            return authorizedProfiles.iterator();
+          }
+        }
+      }
+
+      // if returned number of profiles is less then given range then there are no more profiles in the store
+      // return all found authorized profiles
+      if (fetchedProfilesCount != take) {
+        return authorizedProfiles.iterator();
+      }
+
+      try {
+        // since we request profiles in batches, define number of profiles to skip
+        // if we hit integer overflow return all found authorized profiles
+        skip = Math.addExact(skip, take);
+      } catch (ArithmeticException e) {
+        return authorizedProfiles.iterator();
       }
     }
+
     return authorizedProfiles.iterator();
   }
 
@@ -84,4 +114,34 @@ public abstract class ProfileIterator implements Iterator<Object> {
       return 0;
     }
   }
+
+  /**
+   * Returns profiles based of given range.
+   *
+   * @param skip number of profiles to skip
+   * @param take number of profiles to return
+   * @return profiles iterator
+   */
+  protected abstract Iterator<Entry<String, QueryProfile>> getProfiles(int skip, int take);
+
+  /**
+   * Checks is current user has admin privileges.
+   *
+   * @param context fragment context
+   * @return true if user is admin, false otherwise
+   */
+  private boolean hasAdminPrivileges(ExecutorFragmentContext context) {
+    OptionManager options = context.getOptions();
+    if (context.isUserAuthenticationEnabled() &&
+        !ImpersonationUtil.hasAdminPrivileges(
+            context.getQueryUserName(),
+            ExecConstants.ADMIN_USERS_VALIDATOR.getAdminUsers(options),
+            ExecConstants.ADMIN_USER_GROUPS_VALIDATOR.getAdminUserGroups(options))) {
+      return false;
+    }
+
+    //Passed checks
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java
index 67f1165..9d9f236 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -40,28 +40,26 @@ public class ProfileJsonIterator extends ProfileIterator {
   private final InstanceSerializer<QueryProfile> profileSerializer;
   private final Iterator<ProfileJson> itr;
 
-  public ProfileJsonIterator(ExecutorFragmentContext context) {
-    super(context);
+  public ProfileJsonIterator(ExecutorFragmentContext context, int maxRecords) {
+    super(context, maxRecords);
     //Holding a serializer (for JSON extract)
-    profileSerializer = profileStoreContext.
-        getProfileStoreConfig().getSerializer();
+    this.profileSerializer = profileStoreContext.getProfileStoreConfig().getSerializer();
+    this.itr = iterateProfileInfoJson();
+  }
 
-    itr = iterateProfileInfoJson();
+  @Override
+  protected Iterator<Entry<String, QueryProfile>> getProfiles(int skip, int take) {
+    return profileStoreContext.getCompletedProfileStore().getRange(skip, take);
   }
 
   //Returns an iterator for authorized profiles
   private Iterator<ProfileJson> iterateProfileInfoJson() {
     try {
       //Transform authorized profiles to iterator for ProfileInfoJson
-      return transformJson(
-          getAuthorizedProfiles(
-            profileStoreContext
-              .getCompletedProfileStore()
-              .getAll(),
-            queryingUsername, isAdmin));
+      return transformJson(getAuthorizedProfiles(queryingUsername, isAdmin));
 
     } catch (Exception e) {
-      logger.error(e.getMessage());
+      logger.debug(e.getMessage(), e);
       return Iterators.singletonIterator(ProfileJson.getDefault());
     }
   }
@@ -80,11 +78,11 @@ public class ProfileJsonIterator extends ProfileIterator {
 
         //Constructing ProfileInfo
         final String queryID = input.getKey();
-        String profileJson = null;
+        String profileJson;
         try {
           profileJson = new String(profileSerializer.serialize(input.getValue()));
         } catch (IOException e) {
-          logger.debug("Failed to serialize profile for: " + queryID);
+          logger.debug("Failed to serialize profile for: " + queryID, e);
           profileJson = "{ 'message' : 'error (unable to serialize profile: "+ queryID +")' }";
         }
 
@@ -112,7 +110,7 @@ public class ProfileJsonIterator extends ProfileIterator {
   }
 
   public static class ProfileJson {
-    private static final String UnknownValue = "N/A";
+    private static final String UNKNOWN_VALUE = "N/A";
 
     private static final ProfileJson DEFAULT = new ProfileJson();
 
@@ -125,14 +123,14 @@ public class ProfileJsonIterator extends ProfileIterator {
     }
 
     private ProfileJson() {
-      this(UnknownValue, UnknownValue);
+      this(UNKNOWN_VALUE, UNKNOWN_VALUE);
     }
 
     /**
      * If unable to get ProfileInfo, use this default instance instead.
      * @return the default instance
      */
-    public static final ProfileJson getDefault() {
+    public static ProfileJson getDefault() {
       return DEFAULT;
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
index a49ff9a..8882f2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -26,91 +26,91 @@ import org.apache.drill.exec.store.sys.OptionIterator.OptionValueWrapper;
  * An enumeration of all tables in Drill's system ("sys") schema.
  * <p>
  *   OPTION, DRILLBITS and VERSION are local tables available on every Drillbit.
- *   MEMORY and THREADS are distributed tables with one record on every
- *   Drillbit.
+ *   MEMORY and THREADS are distributed tables with one record on every Drillbit.
+ *   PROFILES and PROFILES_JSON are stored in local / distributed storage.
  * </p>
  */
 public enum SystemTable {
   OPTION("options", false, OptionValueWrapper.class) {
     @Override
-    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context, final int maxRecords) {
       return new OptionIterator(context, OptionIterator.Mode.SYS_SESS_PUBLIC);
     }
   },
 
   OPTION_VAL("options_val", false, ExtendedOptionIterator.ExtendedOptionValueWrapper.class) {
     @Override
-    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context, final int maxRecords) {
       return new ExtendedOptionIterator(context, false);
     }
   },
 
   INTERNAL_OPTIONS("internal_options", false, OptionValueWrapper.class) {
     @Override
-    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context, final int maxRecords) {
       return new OptionIterator(context, OptionIterator.Mode.SYS_SESS_INTERNAL);
     }
   },
 
   INTERNAL_OPTIONS_VAL("internal_options_val", false, ExtendedOptionIterator.ExtendedOptionValueWrapper.class) {
     @Override
-    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context, final int maxRecords) {
       return new ExtendedOptionIterator(context, true);
     }
   },
 
   BOOT("boot", false, OptionValueWrapper.class) {
     @Override
-    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context, final int maxRecords) {
       return new OptionIterator(context, OptionIterator.Mode.BOOT);
     }
   },
 
   DRILLBITS("drillbits", false,DrillbitIterator.DrillbitInstance.class) {
     @Override
-    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context, final int maxRecords) {
       return new DrillbitIterator(context);
     }
   },
 
   VERSION("version", false, VersionIterator.VersionInfo.class) {
     @Override
-    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context, final int maxRecords) {
       return new VersionIterator();
     }
   },
 
   MEMORY("memory", true, MemoryIterator.MemoryInfo.class) {
     @Override
-    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context, final int maxRecords) {
       return new MemoryIterator(context);
     }
   },
 
   CONNECTIONS("connections", true, BitToUserConnectionIterator.ConnectionInfo.class) {
     @Override
-    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context, final int maxRecords) {
       return new BitToUserConnectionIterator(context);
     }
   },
 
   PROFILES("profiles", false, ProfileInfoIterator.ProfileInfo.class) {
     @Override
-    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
-      return new ProfileInfoIterator(context);
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context, final int maxRecords) {
+      return new ProfileInfoIterator(context, maxRecords);
     }
   },
 
   PROFILES_JSON("profiles_json", false, ProfileJsonIterator.ProfileJson.class) {
     @Override
-    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
-      return new ProfileJsonIterator(context);
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context, final int maxRecords) {
+      return new ProfileJsonIterator(context, maxRecords);
     }
   },
 
   THREADS("threads", true, ThreadsIterator.ThreadsInfo.class) {
     @Override
-  public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
+  public Iterator<Object> getIterator(final ExecutorFragmentContext context, final int maxRecords) {
       return new ThreadsIterator(context);
     }
   };
@@ -125,7 +125,7 @@ public enum SystemTable {
     this.pojoClass = pojoClass;
   }
 
-  public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
+  public Iterator<Object> getIterator(final ExecutorFragmentContext context, final int maxRecords) {
     throw new UnsupportedOperationException(tableName + " must override this method.");
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
index c0ef0d0..facf084 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -43,8 +43,8 @@ public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> {
                             final List<RecordBatch> children)
     throws ExecutionSetupException {
     final SystemTable table = scan.getTable();
-    final Iterator<Object> iterator = table.getIterator(context);
-    final RecordReader reader = new PojoRecordReader(table.getPojoClass(), ImmutableList.copyOf(iterator));
+    final Iterator<Object> iterator = table.getIterator(context, scan.getMaxRecordsToRead());
+    final RecordReader reader = new PojoRecordReader(table.getPojoClass(), ImmutableList.copyOf(iterator), scan.getMaxRecordsToRead());
 
     return new ScanBatch(scan, context, Collections.singletonList(reader));
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index 948aa0f..21ed028 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.sys;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 
@@ -68,13 +67,12 @@ public class SystemTablePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
     parent.add(schema.getName(), schema);
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
-      throws IOException {
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) {
     SystemTable table = selection.getWith(context.getLpPersistence(), SystemTable.class);
     return new SystemTableScan(table, this);
   }
@@ -87,7 +85,7 @@ public class SystemTablePlugin extends AbstractStoragePlugin {
     private final Set<String> tableNames;
 
     public SystemSchema() {
-      super(ImmutableList.<String>of(), SYS_SCHEMA_NAME);
+      super(ImmutableList.of(), SYS_SCHEMA_NAME);
       Set<String> names = Sets.newHashSet();
       for (SystemTable t : SystemTable.values()) {
         names.add(t.getTableName());

http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index b77ed23..5f8eb30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.sys;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -31,7 +30,6 @@ import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -48,20 +46,23 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
 
   private final SystemTable table;
   private final SystemTablePlugin plugin;
+  private int maxRecordsToRead;
 
   @JsonCreator
-  public SystemTableScan( //
-      @JsonProperty("table") SystemTable table, //
-      @JacksonInject StoragePluginRegistry engineRegistry //
-  ) throws IOException, ExecutionSetupException {
-    super((String)null);
-    this.table = table;
-    this.plugin = (SystemTablePlugin) engineRegistry.getPlugin(SystemTablePluginConfig.INSTANCE);
+  public SystemTableScan(@JsonProperty("table") SystemTable table,
+                         @JsonProperty("maxRecordsToRead") int maxRecordsToRead,
+                         @JacksonInject StoragePluginRegistry engineRegistry) throws ExecutionSetupException {
+    this(table, maxRecordsToRead, (SystemTablePlugin) engineRegistry.getPlugin(SystemTablePluginConfig.INSTANCE));
   }
 
   public SystemTableScan(SystemTable table, SystemTablePlugin plugin) {
-    super((String)null);
+    this(table, Integer.MAX_VALUE, plugin);
+  }
+
+  public SystemTableScan(SystemTable table, int maxRecordsToRead, SystemTablePlugin plugin) {
+    super((String) null);
     this.table = table;
+    this.maxRecordsToRead = maxRecordsToRead;
     this.plugin = plugin;
   }
 
@@ -75,16 +76,16 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
-    return new SystemTableScan(table, plugin);
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    return new SystemTableScan(table, maxRecordsToRead, plugin);
   }
 
   @Override
-  public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
   }
 
   @Override
-  public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+  public SubScan getSpecificScan(int minorFragmentId) {
     return this;
   }
 
@@ -110,10 +111,23 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
     return maxAllocation;
   }
 
+  /**
+   * Example: SystemTableScan [table=OPTION, distributed=false, maxRecordsToRead=1]
+   *
+   * @return string representation of system table scan
+   */
   @Override
   public String getDigest() {
-    return "SystemTableScan [table=" + table.name() +
-      ", distributed=" + table.isDistributed() + "]";
+    StringBuilder builder = new StringBuilder();
+    builder.append("SystemTableScan [");
+    builder.append("table=").append(table.name()).append(", ");
+    builder.append("distributed=").append(table.isDistributed());
+    if (maxRecordsToRead != Integer.MAX_VALUE) {
+      builder.append(", maxRecordsToRead=").append(maxRecordsToRead);
+    }
+    builder.append("]");
+
+    return builder.toString();
   }
 
   @Override
@@ -152,10 +166,31 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
     return this;
   }
 
+  public GroupScan clone(SystemTableScan systemTableScan, int maxRecordsToRead) {
+    return new SystemTableScan(systemTableScan.getTable(), maxRecordsToRead, systemTableScan.getPlugin());
+  }
+
+  @Override
+  public boolean supportsLimitPushdown() {
+    return true;
+  }
+
+  @Override
+  public GroupScan applyLimit(int maxRecords) {
+    if (this.maxRecordsToRead == maxRecords) {
+      return null;
+    }
+    return clone(this, maxRecords);
+  }
+
   public SystemTable getTable() {
     return table;
   }
 
+  public int getMaxRecordsToRead() {
+    return maxRecordsToRead;
+  }
+
   @JsonIgnore
   public SystemTablePlugin getPlugin() {
     return plugin;

http://git-wip-us.apache.org/repos/asf/drill/blob/8663e8a5/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
index 62f5f4f..4dd09f7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,18 +17,17 @@
  */
 package org.apache.drill.exec.store.sys;
 
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.PlanTestBase;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.exec.ExecConstants;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-public class TestSystemTable extends BaseTestQuery {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSystemTable.class);
+public class TestSystemTable extends PlanTestBase {
 
   @BeforeClass
-  public static void setupMultiNodeCluster() throws Exception {
+  public static void setupMultiNodeCluster() {
     updateTestCluster(3, null);
   }
 
@@ -84,4 +83,11 @@ public class TestSystemTable extends BaseTestQuery {
   public void profilesJsonTable() throws Exception {
     test("select * from sys.profiles_json");
   }
+
+  @Test
+  public void testProfilesLimitPushDown() throws Exception {
+    String query = "select * from sys.profiles limit 10";
+    String numFilesPattern = "maxRecordsToRead=10";
+    testPlanMatchingPatterns(query, new String[] {numFilesPattern}, new String[] {});
+  }
 }