You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pr...@apache.org on 2018/02/25 04:21:52 UTC
zeppelin git commit: [ZEPPELIN-3249] Add support for streaming table
Repository: zeppelin
Updated Branches:
refs/heads/master 066016437 -> 720537aab
[ZEPPELIN-3249] Add support for streaming table
### What is this PR for?
Since Zeppelin support streaming from various backends, I think it will be useful if even tables (and later graphs) can also be streamed.
### What type of PR is it?
[Improvement | Feature]
### Todos
* [x] - At times it fails with `Uncaught TypeError: Cannot read property 'p20180220_113300_1663645286_0_table_gridApi' of undefined`, have to fix this error.
### What is the Jira issue?
* [ZEPPELIN-3249](https://issues.apache.org/jira/browse/ZEPPELIN-3249)
### How should this be tested?
I have done it using shell interpreter, but this should work for all other backends as well
```
%sh
echo "%table"
echo "Col1 Col2"
echo "1 2"
sleep 1
echo "3 4"
echo "5 6"
sleep 2
echo "7 8"
sleep 3
echo "9 10"
echo "11 12"
sleep 4
echo "12 13"
```
### Screenshots (if appropriate)
![zeppelin-3249](https://user-images.githubusercontent.com/674497/36419469-fa4c26e0-1657-11e8-8a8a-717b29d91771.gif)
### Questions:
* Does the licenses files need update? N/A
* Is there breaking changes for older versions? N/A
* Does this needs documentation? N/A
Author: Prabhjyot Singh <pr...@gmail.com>
Closes #2809 from prabhjyotsingh/ZEPPELIN-3249 and squashes the following commits:
463599d [Prabhjyot Singh] Merge remote-tracking branch 'origin/master' into ZEPPELIN-3249
568f334 [Prabhjyot Singh] fallback option for persistedTableOption don't commit viz if paragraph is in running or pending state
e6bce80 [Prabhjyot Singh] fix undefined
6ce5d18 [Prabhjyot Singh] concat not required all time
21564fc [Prabhjyot Singh] Add support for streaming table
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/720537aa
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/720537aa
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/720537aa
Branch: refs/heads/master
Commit: 720537aab99229309fb7b00c3f6006f0fea4a779
Parents: 0660164
Author: Prabhjyot Singh <pr...@gmail.com>
Authored: Fri Feb 23 10:43:12 2018 +0530
Committer: Prabhjyot Singh <pr...@gmail.com>
Committed: Sun Feb 25 09:51:36 2018 +0530
----------------------------------------------------------------------
.../InterpreterResultMessageOutput.java | 2 +-
.../src/app/notebook/notebook.controller.js | 23 +++--
.../paragraph/result/result.controller.js | 100 +++++++++++++------
.../builtins/visualization-table.js | 40 +++++++-
.../builtins/visualization-util.js | 11 +-
5 files changed, 131 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/720537aa/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java
index da31364..8758c98 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java
@@ -232,7 +232,7 @@ public class InterpreterResultMessageOutput extends OutputStream {
}
public boolean isAppendSupported() {
- return type == InterpreterResult.Type.TEXT;
+ return type == InterpreterResult.Type.TEXT || type == InterpreterResult.Type.TABLE;
}
private void copyStream(InputStream in, OutputStream out) throws IOException {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/720537aa/zeppelin-web/src/app/notebook/notebook.controller.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/notebook/notebook.controller.js b/zeppelin-web/src/app/notebook/notebook.controller.js
index 05ab9fb..4c9de9c 100644
--- a/zeppelin-web/src/app/notebook/notebook.controller.js
+++ b/zeppelin-web/src/app/notebook/notebook.controller.js
@@ -278,16 +278,19 @@ function NotebookCtrl($scope, $route, $routeParams, $location, $rootScope,
$scope.$on('listRevisionHistory', function(event, data) {
console.debug('received list of revisions %o', data);
$scope.noteRevisions = data.revisionList;
- if ($scope.noteRevisions.length === 0 || $scope.noteRevisions[0].id !== 'Head') {
- $scope.noteRevisions.splice(0, 0, {
- id: 'Head',
- message: 'Head',
- });
- }
- if ($routeParams.revisionId) {
- let index = _.findIndex($scope.noteRevisions, {'id': $routeParams.revisionId});
- if (index > -1) {
- $scope.currentRevision = $scope.noteRevisions[index].message;
+ if ($scope.noteRevisions) {
+ if ($scope.noteRevisions.length === 0 || $scope.noteRevisions[0].id !== 'Head') {
+ $scope.noteRevisions.splice(0, 0, {
+ id: 'Head',
+ message: 'Head',
+ });
+ }
+ if ($routeParams.revisionId) {
+ let index = _.findIndex($scope.noteRevisions,
+ {'id': $routeParams.revisionId});
+ if (index > -1) {
+ $scope.currentRevision = $scope.noteRevisions[index].message;
+ }
}
}
});
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/720537aa/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js b/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
index 5bf77dc..29465e5 100644
--- a/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
+++ b/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
@@ -243,10 +243,22 @@ function ResultCtrl($scope, $rootScope, $route, $window, $routeParams, $location
if (paragraph.id === data.paragraphId &&
resultIndex === data.index &&
(paragraph.status === ParagraphStatus.PENDING || paragraph.status === ParagraphStatus.RUNNING)) {
- if (DefaultDisplayType.TEXT !== $scope.type) {
+ // Check if result type is eiter TEXT or TABLE, if not then treat it like TEXT
+ if ([DefaultDisplayType.TEXT, DefaultDisplayType.TABLE].indexOf($scope.type) < 0) {
$scope.type = DefaultDisplayType.TEXT;
}
- appendTextOutput(data.data);
+ if ($scope.type === DefaultDisplayType.TEXT) {
+ appendTextOutput(data.data);
+ } else if ($scope.type === DefaultDisplayType.TABLE) {
+ appendTableOutput(data);
+ }
+ }
+ if (paragraph.id === data.paragraphId &&
+ resultIndex === data.index &&
+ paragraph.status === ParagraphStatus.FINISHED) {
+ if ($scope.type === DefaultDisplayType.TABLE) {
+ appendTableOutput(data);
+ }
}
});
@@ -531,6 +543,39 @@ function ResultCtrl($scope, $rootScope, $route, $window, $routeParams, $location
}
};
+ function appendTableOutput(data) {
+ if (!$scope.$parent.result.data) {
+ $scope.$parent.result.data = [];
+ tableData = undefined;
+ }
+ if (!$scope.$parent.result.data[data.index]) {
+ $scope.$parent.result.data[data.index] = '';
+ }
+ if (!tableData) {
+ $scope.$parent.result.data[data.index] = $scope.$parent.result.data[data.index].concat(data.data);
+ $rootScope.$broadcast(
+ 'updateResult',
+ {'data': $scope.$parent.result.data[data.index], 'type': 'TABLE'},
+ undefined,
+ paragraph,
+ data.index);
+ let elemId = `p${$scope.id}_table`;
+ renderGraph(elemId, 'table', true);
+ } else {
+ let textRows = data.data.split('\n');
+ for (let i = 0; i < textRows.length; i++) {
+ if (textRows[i] !== '') {
+ let row = textRows[i].split('\t');
+ tableData.rows.push(row);
+ let builtInViz = builtInVisualizations['table'];
+ if (builtInViz.instance !== undefined) {
+ builtInViz.instance.append([row], tableData.columns);
+ }
+ }
+ }
+ }
+ }
+
function appendTextOutput(data) {
const elemId = getTextResultElemId($scope.id);
textResultQueueForAppend.push(data);
@@ -744,33 +789,32 @@ function ResultCtrl($scope, $rootScope, $route, $window, $routeParams, $location
};
const commitVizConfigChange = function(config, vizId) {
- let newConfig = angular.copy($scope.config);
- if (!newConfig.graph) {
- newConfig.graph = {};
- }
-
- // copy setting for vizId
- if (!newConfig.graph.setting) {
- newConfig.graph.setting = {};
- }
- newConfig.graph.setting[vizId] = angular.copy(config);
-
- // copy common setting
- if (newConfig.graph.setting[vizId]) {
- newConfig.graph.commonSetting = newConfig.graph.setting[vizId].common;
- delete newConfig.graph.setting[vizId].common;
- }
-
- // copy pivot setting
- if (newConfig.graph.commonSetting && newConfig.graph.commonSetting.pivot) {
- newConfig.graph.keys = newConfig.graph.commonSetting.pivot.keys;
- newConfig.graph.groups = newConfig.graph.commonSetting.pivot.groups;
- newConfig.graph.values = newConfig.graph.commonSetting.pivot.values;
- delete newConfig.graph.commonSetting.pivot;
+ if ([ParagraphStatus.RUNNING, ParagraphStatus.PENDING].indexOf(paragraph.status) < 0) {
+ let newConfig = angular.copy($scope.config);
+ if (!newConfig.graph) {
+ newConfig.graph = {};
+ }
+ // copy setting for vizId
+ if (!newConfig.graph.setting) {
+ newConfig.graph.setting = {};
+ }
+ newConfig.graph.setting[vizId] = angular.copy(config);
+ // copy common setting
+ if (newConfig.graph.setting[vizId]) {
+ newConfig.graph.commonSetting = newConfig.graph.setting[vizId].common;
+ delete newConfig.graph.setting[vizId].common;
+ }
+ // copy pivot setting
+ if (newConfig.graph.commonSetting && newConfig.graph.commonSetting.pivot) {
+ newConfig.graph.keys = newConfig.graph.commonSetting.pivot.keys;
+ newConfig.graph.groups = newConfig.graph.commonSetting.pivot.groups;
+ newConfig.graph.values = newConfig.graph.commonSetting.pivot.values;
+ delete newConfig.graph.commonSetting.pivot;
+ }
+ console.debug('committVizConfig', newConfig);
+ let newParams = angular.copy(paragraph.settings.params);
+ commitParagraphResult(paragraph.title, paragraph.text, newConfig, newParams);
}
- console.debug('committVizConfig', newConfig);
- let newParams = angular.copy(paragraph.settings.params);
- commitParagraphResult(paragraph.title, paragraph.text, newConfig, newParams);
};
$scope.$on('paragraphResized', function(event, paragraphId) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/720537aa/zeppelin-web/src/app/visualization/builtins/visualization-table.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/visualization/builtins/visualization-table.js b/zeppelin-web/src/app/visualization/builtins/visualization-table.js
index d77efbc..723bb3a 100644
--- a/zeppelin-web/src/app/visualization/builtins/visualization-table.js
+++ b/zeppelin-web/src/app/visualization/builtins/visualization-table.js
@@ -16,12 +16,19 @@ import Visualization from '../visualization';
import PassthroughTransformation from '../../tabledata/passthrough';
import {
- Widget, ValueType,
- isInputWidget, isOptionWidget, isCheckboxWidget,
- isTextareaWidget, isBtnGroupWidget,
- initializeTableConfig, resetTableOptionConfig,
- DefaultTableColumnType, TableColumnType, updateColumnTypeState,
+ DefaultTableColumnType,
+ initializeTableConfig,
+ isBtnGroupWidget,
+ isCheckboxWidget,
+ isInputWidget,
+ isOptionWidget,
+ isTextareaWidget,
parseTableOption,
+ resetTableOptionConfig,
+ TableColumnType,
+ updateColumnTypeState,
+ ValueType,
+ Widget,
} from './visualization-util';
const SETTING_TEMPLATE = require('./visualization-table-setting.html');
@@ -247,6 +254,29 @@ export default class TableVisualization extends Visualization {
gridOptions.enableSelectionBatchEvent = false;
}
+ append(row, columns) {
+ const gridOptions = this.getGridOptions();
+ this.setDynamicGridOptions(gridOptions, this.config);
+ // this.refreshGrid()
+ const gridElemId = this.getGridElemId();
+ const gridElem = angular.element(`#${gridElemId}`);
+
+ if (gridElem) {
+ const scope = this.getScope();
+
+ const columnNames = columns.map((c) => c.name);
+ let gridData = row.map((r) => {
+ return columnNames.reduce((acc, colName, index) => {
+ acc[colName] = r[index];
+ return acc;
+ }, {});
+ });
+ gridData.map((data) => {
+ scope[gridElemId].data.push(data);
+ });
+ }
+ }
+
render(tableData) {
const gridElemId = this.getGridElemId();
let gridElem = document.getElementById(gridElemId);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/720537aa/zeppelin-web/src/app/visualization/builtins/visualization-util.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/visualization/builtins/visualization-util.js b/zeppelin-web/src/app/visualization/builtins/visualization-util.js
index a82a18e..7feb129 100644
--- a/zeppelin-web/src/app/visualization/builtins/visualization-util.js
+++ b/zeppelin-web/src/app/visualization/builtins/visualization-util.js
@@ -100,7 +100,16 @@ export function initializeTableConfig(config, tableOptionSpecs) {
export function parseTableOption(specs, persistedTableOption) {
/** copy original params */
- const parsed = JSON.parse(JSON.stringify(persistedTableOption));
+ let parsed;
+ try {
+ parsed = JSON.parse(JSON.stringify(persistedTableOption));
+ } catch (e) {
+ // if not able to parse fall back to default values coming from specs
+ parsed = {};
+ for (let spec of specs) {
+ parsed[spec['name']] = spec['defaultValue'];
+ }
+ }
for (let i = 0; i < specs.length; i++) {
const s = specs[i];