You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pr...@apache.org on 2018/02/25 04:21:52 UTC

zeppelin git commit: [ZEPPELIN-3249] Add support for streaming table

Repository: zeppelin
Updated Branches:
  refs/heads/master 066016437 -> 720537aab


[ZEPPELIN-3249] Add support for streaming table

### What is this PR for?
Since Zeppelin support streaming from various backends, I think it will be useful if even tables (and later graphs) can also be streamed.

### What type of PR is it?
[Improvement | Feature]

### Todos
* [x] - At times it fails with `Uncaught TypeError: Cannot read property 'p20180220_113300_1663645286_0_table_gridApi' of undefined`, have to fix this error.

### What is the Jira issue?
* [ZEPPELIN-3249](https://issues.apache.org/jira/browse/ZEPPELIN-3249)

### How should this be tested?
I have done it using shell interpreter, but this should work for all other backends as well
```
%sh
echo "%table"
echo "Col1	Col2"
echo "1	2"
sleep 1
echo "3	4"
echo "5	6"
sleep 2
echo "7	8"
sleep 3
echo "9	10"
echo "11	12"
sleep 4
echo "12	13"
```

### Screenshots (if appropriate)
![zeppelin-3249](https://user-images.githubusercontent.com/674497/36419469-fa4c26e0-1657-11e8-8a8a-717b29d91771.gif)

### Questions:
* Does the licenses files need update? N/A
* Is there breaking changes for older versions? N/A
* Does this needs documentation? N/A

Author: Prabhjyot Singh <pr...@gmail.com>

Closes #2809 from prabhjyotsingh/ZEPPELIN-3249 and squashes the following commits:

463599d [Prabhjyot Singh] Merge remote-tracking branch 'origin/master' into ZEPPELIN-3249
568f334 [Prabhjyot Singh] fallback option for persistedTableOption don't commit viz if paragraph is in running or pending state
e6bce80 [Prabhjyot Singh] fix undefined
6ce5d18 [Prabhjyot Singh] concat not required all time
21564fc [Prabhjyot Singh] Add support for streaming table


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/720537aa
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/720537aa
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/720537aa

Branch: refs/heads/master
Commit: 720537aab99229309fb7b00c3f6006f0fea4a779
Parents: 0660164
Author: Prabhjyot Singh <pr...@gmail.com>
Authored: Fri Feb 23 10:43:12 2018 +0530
Committer: Prabhjyot Singh <pr...@gmail.com>
Committed: Sun Feb 25 09:51:36 2018 +0530

----------------------------------------------------------------------
 .../InterpreterResultMessageOutput.java         |   2 +-
 .../src/app/notebook/notebook.controller.js     |  23 +++--
 .../paragraph/result/result.controller.js       | 100 +++++++++++++------
 .../builtins/visualization-table.js             |  40 +++++++-
 .../builtins/visualization-util.js              |  11 +-
 5 files changed, 131 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/720537aa/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java
index da31364..8758c98 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java
@@ -232,7 +232,7 @@ public class InterpreterResultMessageOutput extends OutputStream {
   }
 
   public boolean isAppendSupported() {
-    return type == InterpreterResult.Type.TEXT;
+    return type == InterpreterResult.Type.TEXT || type == InterpreterResult.Type.TABLE;
   }
 
   private void copyStream(InputStream in, OutputStream out) throws IOException {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/720537aa/zeppelin-web/src/app/notebook/notebook.controller.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/notebook/notebook.controller.js b/zeppelin-web/src/app/notebook/notebook.controller.js
index 05ab9fb..4c9de9c 100644
--- a/zeppelin-web/src/app/notebook/notebook.controller.js
+++ b/zeppelin-web/src/app/notebook/notebook.controller.js
@@ -278,16 +278,19 @@ function NotebookCtrl($scope, $route, $routeParams, $location, $rootScope,
   $scope.$on('listRevisionHistory', function(event, data) {
     console.debug('received list of revisions %o', data);
     $scope.noteRevisions = data.revisionList;
-    if ($scope.noteRevisions.length === 0 || $scope.noteRevisions[0].id !== 'Head') {
-      $scope.noteRevisions.splice(0, 0, {
-        id: 'Head',
-        message: 'Head',
-      });
-    }
-    if ($routeParams.revisionId) {
-      let index = _.findIndex($scope.noteRevisions, {'id': $routeParams.revisionId});
-      if (index > -1) {
-        $scope.currentRevision = $scope.noteRevisions[index].message;
+    if ($scope.noteRevisions) {
+      if ($scope.noteRevisions.length === 0 || $scope.noteRevisions[0].id !== 'Head') {
+        $scope.noteRevisions.splice(0, 0, {
+          id: 'Head',
+          message: 'Head',
+        });
+      }
+      if ($routeParams.revisionId) {
+        let index = _.findIndex($scope.noteRevisions,
+          {'id': $routeParams.revisionId});
+        if (index > -1) {
+          $scope.currentRevision = $scope.noteRevisions[index].message;
+        }
       }
     }
   });

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/720537aa/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js b/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
index 5bf77dc..29465e5 100644
--- a/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
+++ b/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
@@ -243,10 +243,22 @@ function ResultCtrl($scope, $rootScope, $route, $window, $routeParams, $location
     if (paragraph.id === data.paragraphId &&
       resultIndex === data.index &&
       (paragraph.status === ParagraphStatus.PENDING || paragraph.status === ParagraphStatus.RUNNING)) {
-      if (DefaultDisplayType.TEXT !== $scope.type) {
+      // Check if result type is eiter TEXT or TABLE, if not then treat it like TEXT
+      if ([DefaultDisplayType.TEXT, DefaultDisplayType.TABLE].indexOf($scope.type) < 0) {
         $scope.type = DefaultDisplayType.TEXT;
       }
-      appendTextOutput(data.data);
+      if ($scope.type === DefaultDisplayType.TEXT) {
+        appendTextOutput(data.data);
+      } else if ($scope.type === DefaultDisplayType.TABLE) {
+        appendTableOutput(data);
+      }
+    }
+    if (paragraph.id === data.paragraphId &&
+      resultIndex === data.index &&
+      paragraph.status === ParagraphStatus.FINISHED) {
+      if ($scope.type === DefaultDisplayType.TABLE) {
+        appendTableOutput(data);
+      }
     }
   });
 
@@ -531,6 +543,39 @@ function ResultCtrl($scope, $rootScope, $route, $window, $routeParams, $location
     }
   };
 
+  function appendTableOutput(data) {
+    if (!$scope.$parent.result.data) {
+      $scope.$parent.result.data = [];
+      tableData = undefined;
+    }
+    if (!$scope.$parent.result.data[data.index]) {
+      $scope.$parent.result.data[data.index] = '';
+    }
+    if (!tableData) {
+      $scope.$parent.result.data[data.index] = $scope.$parent.result.data[data.index].concat(data.data);
+      $rootScope.$broadcast(
+        'updateResult',
+        {'data': $scope.$parent.result.data[data.index], 'type': 'TABLE'},
+        undefined,
+        paragraph,
+        data.index);
+      let elemId = `p${$scope.id}_table`;
+      renderGraph(elemId, 'table', true);
+    } else {
+      let textRows = data.data.split('\n');
+      for (let i = 0; i < textRows.length; i++) {
+        if (textRows[i] !== '') {
+          let row = textRows[i].split('\t');
+          tableData.rows.push(row);
+          let builtInViz = builtInVisualizations['table'];
+          if (builtInViz.instance !== undefined) {
+            builtInViz.instance.append([row], tableData.columns);
+          }
+        }
+      }
+    }
+  }
+
   function appendTextOutput(data) {
     const elemId = getTextResultElemId($scope.id);
     textResultQueueForAppend.push(data);
@@ -744,33 +789,32 @@ function ResultCtrl($scope, $rootScope, $route, $window, $routeParams, $location
   };
 
   const commitVizConfigChange = function(config, vizId) {
-    let newConfig = angular.copy($scope.config);
-    if (!newConfig.graph) {
-      newConfig.graph = {};
-    }
-
-    // copy setting for vizId
-    if (!newConfig.graph.setting) {
-      newConfig.graph.setting = {};
-    }
-    newConfig.graph.setting[vizId] = angular.copy(config);
-
-    // copy common setting
-    if (newConfig.graph.setting[vizId]) {
-      newConfig.graph.commonSetting = newConfig.graph.setting[vizId].common;
-      delete newConfig.graph.setting[vizId].common;
-    }
-
-    // copy pivot setting
-    if (newConfig.graph.commonSetting && newConfig.graph.commonSetting.pivot) {
-      newConfig.graph.keys = newConfig.graph.commonSetting.pivot.keys;
-      newConfig.graph.groups = newConfig.graph.commonSetting.pivot.groups;
-      newConfig.graph.values = newConfig.graph.commonSetting.pivot.values;
-      delete newConfig.graph.commonSetting.pivot;
+    if ([ParagraphStatus.RUNNING, ParagraphStatus.PENDING].indexOf(paragraph.status) < 0) {
+      let newConfig = angular.copy($scope.config);
+      if (!newConfig.graph) {
+        newConfig.graph = {};
+      }
+      // copy setting for vizId
+      if (!newConfig.graph.setting) {
+        newConfig.graph.setting = {};
+      }
+      newConfig.graph.setting[vizId] = angular.copy(config);
+      // copy common setting
+      if (newConfig.graph.setting[vizId]) {
+        newConfig.graph.commonSetting = newConfig.graph.setting[vizId].common;
+        delete newConfig.graph.setting[vizId].common;
+      }
+      // copy pivot setting
+      if (newConfig.graph.commonSetting && newConfig.graph.commonSetting.pivot) {
+        newConfig.graph.keys = newConfig.graph.commonSetting.pivot.keys;
+        newConfig.graph.groups = newConfig.graph.commonSetting.pivot.groups;
+        newConfig.graph.values = newConfig.graph.commonSetting.pivot.values;
+        delete newConfig.graph.commonSetting.pivot;
+      }
+      console.debug('committVizConfig', newConfig);
+      let newParams = angular.copy(paragraph.settings.params);
+      commitParagraphResult(paragraph.title, paragraph.text, newConfig, newParams);
     }
-    console.debug('committVizConfig', newConfig);
-    let newParams = angular.copy(paragraph.settings.params);
-    commitParagraphResult(paragraph.title, paragraph.text, newConfig, newParams);
   };
 
   $scope.$on('paragraphResized', function(event, paragraphId) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/720537aa/zeppelin-web/src/app/visualization/builtins/visualization-table.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/visualization/builtins/visualization-table.js b/zeppelin-web/src/app/visualization/builtins/visualization-table.js
index d77efbc..723bb3a 100644
--- a/zeppelin-web/src/app/visualization/builtins/visualization-table.js
+++ b/zeppelin-web/src/app/visualization/builtins/visualization-table.js
@@ -16,12 +16,19 @@ import Visualization from '../visualization';
 import PassthroughTransformation from '../../tabledata/passthrough';
 
 import {
-  Widget, ValueType,
-  isInputWidget, isOptionWidget, isCheckboxWidget,
-  isTextareaWidget, isBtnGroupWidget,
-  initializeTableConfig, resetTableOptionConfig,
-  DefaultTableColumnType, TableColumnType, updateColumnTypeState,
+  DefaultTableColumnType,
+  initializeTableConfig,
+  isBtnGroupWidget,
+  isCheckboxWidget,
+  isInputWidget,
+  isOptionWidget,
+  isTextareaWidget,
   parseTableOption,
+  resetTableOptionConfig,
+  TableColumnType,
+  updateColumnTypeState,
+  ValueType,
+  Widget,
 } from './visualization-util';
 
 const SETTING_TEMPLATE = require('./visualization-table-setting.html');
@@ -247,6 +254,29 @@ export default class TableVisualization extends Visualization {
     gridOptions.enableSelectionBatchEvent = false;
   }
 
+  append(row, columns) {
+    const gridOptions = this.getGridOptions();
+    this.setDynamicGridOptions(gridOptions, this.config);
+    // this.refreshGrid()
+    const gridElemId = this.getGridElemId();
+    const gridElem = angular.element(`#${gridElemId}`);
+
+    if (gridElem) {
+      const scope = this.getScope();
+
+      const columnNames = columns.map((c) => c.name);
+      let gridData = row.map((r) => {
+        return columnNames.reduce((acc, colName, index) => {
+          acc[colName] = r[index];
+          return acc;
+        }, {});
+      });
+      gridData.map((data) => {
+        scope[gridElemId].data.push(data);
+      });
+    }
+  }
+
   render(tableData) {
     const gridElemId = this.getGridElemId();
     let gridElem = document.getElementById(gridElemId);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/720537aa/zeppelin-web/src/app/visualization/builtins/visualization-util.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/visualization/builtins/visualization-util.js b/zeppelin-web/src/app/visualization/builtins/visualization-util.js
index a82a18e..7feb129 100644
--- a/zeppelin-web/src/app/visualization/builtins/visualization-util.js
+++ b/zeppelin-web/src/app/visualization/builtins/visualization-util.js
@@ -100,7 +100,16 @@ export function initializeTableConfig(config, tableOptionSpecs) {
 
 export function parseTableOption(specs, persistedTableOption) {
   /** copy original params */
-  const parsed = JSON.parse(JSON.stringify(persistedTableOption));
+  let parsed;
+  try {
+    parsed = JSON.parse(JSON.stringify(persistedTableOption));
+  } catch (e) {
+    // if not able to parse fall back to default values coming from specs
+    parsed = {};
+    for (let spec of specs) {
+      parsed[spec['name']] = spec['defaultValue'];
+    }
+  }
 
   for (let i = 0; i < specs.length; i++) {
     const s = specs[i];